-
Notifications
You must be signed in to change notification settings - Fork 0
/
odfit.py
executable file
·364 lines (291 loc) · 11.5 KB
/
odfit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#!/usr/bin/env python
"""Creates textual dumps of OpenDocument Format files and other zip archives.
This includes .odt's, .odb's, or really any file which is an archive.
Normalizes XML files by parsing and pretty-printing them.
"""
# Licensed under the FreeBSD license.
# See the file COPYING for details.
from functools import partial
import logging
from sys import stderr
# logging
def make_logger(name, level=logging.DEBUG, strm=stderr):
import logging
logger = logging.getLogger(name)
handler = logging.StreamHandler(strm=stderr)
logger.addHandler(handler)
logger.handler = handler
return logger
warnings = make_logger('warnings')
# Filetype detection
def is_in_charset(string, charset='ascii'):
try:
string.decode(charset)
except UnicodeDecodeError:
return False
return True
is_utf8 = partial(is_in_charset, charset='utf-8')
class FiletypeDetector(object):
"""Scans into a file to determine its nature.
A call on an instantiated FiletypeDetector object
returns 'binary', 'utf-8', or 'unknown' depending on the file contents.
``how_far`` is used as a key into ``self.thoroughness``
to determine how many bytes into the file should be scanned.
"""
@property
def thoroughness(self):
"""How far into the file to scan for stuff."""
return {'abide': 800,
'mellow': 8000, # what ``git diff`` does
'strict': None, # scan the whole thing
}
def __call__(self, file_, how_far='mellow'):
chunk_size = 8192
left_to_scan = self.thoroughness[how_far]
scan_to_the_end = (self.thoroughness[how_far] is not None)
file_.seek(0)
while True:
chunk = file_.read(chunk_size)
if not chunk:
break
if '\000' in chunk:
return 'binary'
if not is_utf8(chunk):
return 'unknown'
if not scan_to_the_end:
left_to_scan -= len(chunk)
if left_to_scan < 0 and thoroughness[how_far]:
break
return 'utf-8'
# XML Processing
class XMLParseError(Exception):
"""Raised in response to errors when parsing an archived XML file."""
pass
def tidy_xml(xml_file):
"""Returns a tidied version of the xml in ``xml_file``.
Prefers ``lxml.etree`` for speed and robustitude;
falls back to ``xml.etree.ElementTree``.
"""
try:
from lxml.etree import parse, tostring
from functools import partial
pretty_print = partial(tostring, pretty_print=True)
except ImportError:
from xml.dom.minidom import parse
from operator import methodcaller
pretty_print = methodcaller('toprettyxml', indent=' ')
try:
tree = parse(xml_file)
except Exception as e:
# The xml libraries don't make it clear
# what exceptions their ``parse`` functions raise,
# so we catch everything.
# TODO: re-raise anything that's related to the underlying file.
raise XMLParseError("XML parse routine raised error '{0}'"
.format(e))
return pretty_print(tree)
def is_xml(archive, info):
"""Indicates if the file in ``archive`` identified by ``info`` is XML."""
return info.filename.endswith('.xml')
# metadata generation and formatting
def format_header(info, content):
"""Format ``content`` as a header for the file identified by ``info``.
``content`` must not contain any newlines.
"""
assert content.find('\n') == -1
return '{0}: {1}'.format(info.filename, content)
def progressive_hash(hash_, file_, chunk_size=1048576):
"""Updates ``hash_`` with reads from ``file_``."""
while True:
chunk = file_.read(chunk_size)
if not chunk:
return hash_.hexdigest()
hash_.update(chunk)
def sha1_hash(info, member):
"""Returns a (name, hash) pair: ``name`` identifies the algorithm."""
from hashlib import sha1
member.seek(0)
return ('sha1', progressive_hash(sha1(), member))
class FormattedZipInfo(object):
"""Proxies a ZipInfo object, providing formatted versions of its data."""
def __init__(self, info):
self._info = info
def __getattr__(self, attr):
return getattr(self._info, attr)
@property
def date_time(self):
return ('{0:02}-{1:02}-{2:02}T{3:02}:{4:02}:{5:02}'
.format(*self._info.date_time))
def __dir__(self):
return dir(self._info)
# TODO: The sha1 and filetype should be part of this sequence.
# Actually this should be a class with interdependent properties
metadata_items = ('date_time', 'comment', 'extra', 'file_size', 'CRC')
def iterate_metadata(info, member,
metadata_items=metadata_items,
hash_=sha1_hash):
"""Yields (name, value) tuples of formatted metadata information.
The attribs of the ``info`` object are filtered through a
FormattedZipInfo object.
"""
from functools import partial
from itertools import chain
drop_empties = frozenset(('comment', 'extra')).__contains__
info = FormattedZipInfo(info)
get = partial(getattr, info)
item_strings = ((attr, str(get(attr))) for attr in metadata_items)
item_strings = chain(item_strings, (hash_(info, member),))
return (pair for pair in item_strings
if pair[1] or not drop_empties(pair[0]))
def format_metadata(pair):
"""Formats a (name, value) pair as a string for header inclusion.
Any newlines in ``name`` or ``value`` are replaced with two spaces.
"""
strings = (str(element) for element in pair)
joined_lines = (s.replace('\n', ' ') for s in strings)
return '{0}: {1}'.format(*joined_lines)
# content formatting
def format_content(info, line):
"""Formats a line of content for dumping."""
assert line.find('\n') == -1
return '{0}:: {1}'.format(info.filename, line)
# dumping of each archive member
# Written for use in `detail` to allow a common interface between
# tidied XML files (as StringIO instances) and other files.
class ResettableZipEntry(object):
"""Adds ``seek(0)`` to the ZipExtFile API.
Call ``ResettableZipEntry(archive, info)``
instead of ``archive.open(info)``.
Also provides error handling for its read method
in case of corrupt zipfile members.
"""
def __init__(self, archive, info):
self._archive = archive
self._info = info
self._zef = archive.open(info)
def seek(self, position):
"""Reset the member by closing and reopening it from the archive."""
if position != 0:
raise ValueError("SeekableZipEntry only supports seeking"
" to position 0.")
self._zef.close()
self._zef = self._archive.open(self._info)
def __iter__(self):
return self
def _error_wrap(self, f, *args, **kwargs):
"""Call ``f`` with zlib error handling in place."""
from zlib import error
try:
return f(*args, **kwargs)
except error as e:
def indent(string):
return '\n'.join(' ' + line for line in string.split("\n"))
error_message = indent(str(e))
warnings.warning("zlib error for archive member '{0}':{1}"
.format(self._info.filename, error_message))
return ''
# Error-wrapped methods.
def read(self, *args, **kwargs):
return self._error_wrap(self._zef.read, *args, **kwargs)
def next(self):
return self._error_wrap(self._zef.next)
def readline(self, *args, **kwargs):
return self._error_wrap(self._zef.readline, *args, **kwargs)
def __getattr__(self, attr):
return getattr(self._zef, attr)
def detail(archive, info,
format_header=format_header,
iterate_metadata=iterate_metadata,
format_metadata=format_metadata,
hash_=sha1_hash,
is_xml=is_xml,
tidy_xml=tidy_xml,
FiletypeDetector=FiletypeDetector,
format_content=format_content,
):
"""Yields lines of detail from the file identified by ``info``.
The first set of lines contains metadata about the file,
including a checksum.
These lines begin with the filename, followed by ':: '.
For binary files, no further lines are yielded.
For text files, subsequent lines contain lines of the file
with the filename + ": " prepended.
XML files are tidied before being output.
For info on the keyword arguments,
see the docstrings for the functions that implement their defaults.
"""
from StringIO import StringIO
from contextlib import closing
with closing(ResettableZipEntry(archive, info)) as member:
for md in iterate_metadata(info, member):
yield format_header(info, format_metadata(md))
if is_xml(archive, info) and info.file_size > 2:
member.seek(0)
try:
member = StringIO(tidy_xml(member))
except XMLParseError as e:
warnings.warning("Error parsing XML in member '{0}': {1}; "
"XML tidying aborted."
.format(info.filename, e))
member.seek(0)
filetype = FiletypeDetector()(member)
yield format_header(info, format_metadata(('filetype', filetype)))
if filetype == 'utf-8':
member.seek(0)
for line in member:
yield format_content(info, line.rstrip('\n'))
# dumping entire archive
def archive_details(filename, detail=detail, sort_key=None):
"""Yields annotated lines of files and/or metadata from the archive.
The function argument `detail` can be provided
to customize formatting of archive members.
If `sort_key` is provided as a truthy value,
it will be passed as the `key` keyword argument
to sort the member info list.
"""
from zipfile import ZipFile
from itertools import chain
from contextlib import closing
with closing(ZipFile(filename, 'r')) as archive:
infos = archive.infolist()
if sort_key:
infos.sort(key=sort_key)
details = (detail(archive, info) for info in infos)
for file_detail in details:
for line in file_detail:
yield line
# do it!
def main():
import optparse
from functools import partial
parser = optparse.OptionParser(
"usage: %prog [-h] FILENAME",
description=__doc__
)
parser.add_option('-D', '--no-dump-date',
help="Do not dump the date of each archive member.",
action='append_const',
const='date_time',
dest='nodump',
default=[],
)
parser.add_option('-s', '--sort',
help="Sort the members by filename.",
action='store_const',
const=lambda info: info.filename,
default=None,
)
opts, args = parser.parse_args()
if len(args) != 1:
parser.error("The document FILENAME"
" must be given as the only argument.")
filename = args[0]
md_items_to_dump = [i for i in metadata_items if not i in opts.nodump]
iterate_metadata_nodump = partial(iterate_metadata,
metadata_items=md_items_to_dump)
detail_nodump = partial(detail, iterate_metadata=iterate_metadata_nodump)
lines = archive_details(filename, detail=detail_nodump, sort_key=opts.sort)
for line in lines:
print line
if __name__ == '__main__':
exit(main())