-
Notifications
You must be signed in to change notification settings - Fork 7
/
critique.py
executable file
·279 lines (231 loc) · 9.75 KB
/
critique.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#!/usr/bin/env python
'''
tests all files in this repo for basic compliance with the NeXus standard (per 2016 NIAC)
USAGE
To update the compliance report in this repo::
./critique.py | tee critique.md
Notes:
* use h5py 2.10 or higher
(https://github.com/nexusformat/exampledata/pull/14#issuecomment-577305522)
* This code is compliant with both python 2 and python 3
* This code does not perform a full validation of NeXus data files.
It only checks that a given file can be opened by h5py
and has at least one *NXentry* group.
Tests for this structure
(exact name of *entry01* is not required)::
<file_root>:
entry01 (NXentry)
... # content not checked at this level
... # additional content not checked at this level
:see: http://wiki.nexusformat.org/2014_How_to_find_default_data
:see: https://www.nexusformat.org/NIAC2016Minutes.html#nxdata
'''
import datetime
import h5py
import numpy
import pyRestTable
import os
import sys
import xml.etree.ElementTree as ET
def isNeXusGroup(obj, NXtype):
"""is `obj` a NeXus group?"""
nxclass = None
if isinstance(obj, h5py.Group):
if "NX_class" not in obj.attrs:
return False
try:
nxclass = obj.attrs['NX_class']
except Exception as exc:
# print(exc)
return False
if isinstance(nxclass, numpy.ndarray):
nxclass = nxclass[0]
if isinstance(nxclass, bytes) and not isinstance(nxclass, str):
nxclass = nxclass.decode()
return nxclass == NXtype
def readString(data):
"""There are many ways to encode strings in HDF5 and a variety of code is required for different version of h5py.
This function tries to decode and convert the data until it succeeds."""
if type(data) is str:
return data
elif hasattr(data,'decode'):
return data.decode("utf-8")
else:
try:
return readString(data[0])
except ValueError:
return readString(data[()])
class Critic(object):
'''
describe a file in terms of NeXus compliance.
Each method starting with "test_" will contribute a column to the results table.
:param str path: absolute or relative path to the file directory
:param str fname: (absolute or relative path and) name of file
'''
def __init__(self, path=None, fname=None):
self.path = path
self.fname = fname
self.NXentry_nodes = []
self.filetype = "unrecognised"
self.test_results = []
test_bank = [func for func in dir(Critic) if callable(getattr(Critic, func)) and func.startswith("test_")]
for t in test_bank:
#print(t,path, fname)
try:
self.test_results += [getattr(self, t)(path, fname)]
except:
self.test_results += ["error"]
def find_NX_class_nodes(self, parent, nx_class = 'NXentry'):
'''identify the NXentry (or as specified) nodes'''
node_list = []
for node in parent.values():
if isNeXusGroup(node, nx_class):
node_list.append(node)
return node_list
### Each method starting with "test_" will contribute a column to the results table.
### The tests are conducted in alphabetical order.
### Note that each test can write attributes onto self for later tests to use.
def test_01_FileType(self, path, fname):
if path is None and fname is None:
return "File Type"
try:
with h5py.File(os.path.join(self.path, self.fname), mode="r") as root:
self.filetype = "HDF5"
except IOError:
pass # cannot open with HDF5
if self.filetype == "unrecognised": # try to ID as XML
try:
tree = ET.parse(os.path.join(self.path, self.fname))
self.filetype = "XML"
except ET.ParseError:
pass # cannot open as XML
if self.filetype == "unrecognised": # try to ID as HDF4
MAGIC_HDF4 = b'\x0e\x03\x13\x01\x00\xc8\x00\x00'
with open(os.path.join(self.path, self.fname), "rb") as file:
sig = file.read(8)
if sig == MAGIC_HDF4:
self.filetype = "HDF4"
return self.filetype
# def test_01a_FileHeader(self, path, fname):
# if path is None and fname is None:
# return "Header"
# with open(os.path.join(self.path, self.fname), "rb") as file:
# file.seek(0)
# sig = file.read(8)
# return ":".join("{:02x}".format(x) for x in bytearray(sig))
def test_02_NXentryCount(self, path, fname):
if path is None and fname is None:
return "NXentry Count"
if self.filetype == "HDF5":
with h5py.File(os.path.join(self.path, self.fname), mode="r") as root:
NXentry_nodes = self.find_NX_class_nodes(root, "NXentry")
self.nNXentry = len(NXentry_nodes)
elif self.filetype == "XML":
tree = ET.parse(os.path.join(path, fname))
root = tree.getroot()
namespace = root.tag.split('}')[0].strip('{')
NXentry_nodes = root.findall('{'+namespace+'}NXentry')
self.nNXentry = len(NXentry_nodes)
elif self.filetype == "HDF4":
self.nNXentry = "*"
else:
self.nNXentry = "-"
if self.nNXentry == 0:
return "not NeXus"
else:
return self.nNXentry
def test_03_ApplicationDefinition(self, path, fname):
if path is None and fname is None:
return "Application Def's"
if self.filetype == "HDF5":
if self.nNXentry < 1:
AppDefList = "-"
else:
ad_list = set() # like a list, but only keep unique strings
with h5py.File(os.path.join(self.path, self.fname), mode="r") as root:
NXentry_nodes = self.find_NX_class_nodes(root, "NXentry")
for entry in NXentry_nodes:
subentry_list = self.find_NX_class_nodes(entry, "NXsubentry")
if len(subentry_list) == 0:
if 'definition' in list(entry):
ad_list.add(readString(entry['definition']))
else:
for sub in subentry_list:
if 'definition' in list(sub):
ad_list.add(readString(sub['definition']))
if len(ad_list) == 0:
AppDefList = "None found"
else:
AppDefList = ",".join(ad_list)
elif self.filetype == "XML":
if self.nNXentry < 1:
AppDefList = "-"
else:
ad_list = set() # like a list, but only keep unique strings
tree = ET.parse(os.path.join(path, fname))
root = tree.getroot()
namespace = root.tag.split('}')[0].strip('{')
def_list = root.findall('./{'+namespace+'}NXentry/{'+namespace+'}definition')
def_list+= root.findall('./{'+namespace+'}NXentry/{'+namespace+'}NXsubentry/{'+namespace+'}definition')
for def_tag in def_list:
ad_list.add(def_tag.text)
if len(ad_list) == 0:
AppDefList = "None found"
else:
AppDefList = ",".join(ad_list)
elif self.filetype == "HDF4":
AppDefList = "*"
else:
AppDefList = "-"
return AppDefList
class Registrar(object):
'''keep track of critiqued files in an internal dictionary'''
def __init__(self):
self.db = {}
self.table_labels = ["path", "file"] + Critic().test_results
def add(self, path, critic):
'''add new critique to the database'''
if critic.fname is None:
return
if path not in self.db:
self.db[path] = {}
self.db[path][critic.fname] = critic
def report(self):
table = pyRestTable.Table()
table.labels = self.table_labels
for path, flist in sorted(self.db.items()):
for fname, critique in sorted(flist.items()):
table.addRow(["`"+path+"`", "`"+fname+"`"]+ critique.test_results)
print(table.reST(fmt="markdown"))
def walk_function(registrar, path, files):
'''
called for each directory traversed
:param obj registrar: instance of Registrar(), database of analyzed files
:param str path: subdirectory name
:param [str] files: list of files in *path* directory
'''
if path.find('.git') > -1: # skip the Git VCS directory
return
skip_extensions = ['.txt', '.py', '.rst', '.md', '.in']
for nm in files:
if os.path.splitext(nm)[1] not in skip_extensions and nm[0] != '.': # skip other types of file
registrar.add(path, Critic(path, nm))
def main(path=None):
'''traverse a directory and describe how each file conforms to NeXus'''
registrar = Registrar()
paths = [path or os.path.dirname(__file__) or '.']
while len(paths) > 0:
path = paths.pop()
for subdir, dir_list, file_list in os.walk(path):
if os.path.basename(subdir) in (".vscode", ".git"):
continue
walk_function(registrar, subdir, file_list)
print("# Critique of *exampledata* files")
print("")
print("* date: %s" % datetime.datetime.now())
print("* h5py version: %s" % h5py.__version__)
print("* unimplemented test cases are marked in the table with an asterisk")
print("")
registrar.report()
if __name__ == '__main__':
main()