diff --git a/playa/image.py b/playa/image.py index a35b078..320c255 100644 --- a/playa/image.py +++ b/playa/image.py @@ -2,13 +2,7 @@ import os.path import struct from io import BytesIO -from typing import BinaryIO, Tuple - -try: - from typing import Literal -except ImportError: - # Literal was introduced in Python 3.8 - from typing_extensions import Literal # type: ignore[assignment] +from typing import BinaryIO, Tuple, Literal from playa.exceptions import PDFValueError from playa.jbig2 import JBIG2StreamReader, JBIG2StreamWriter diff --git a/playa/psparser.py b/playa/psparser.py index ebc72d6..e2c0c9c 100755 --- a/playa/psparser.py +++ b/playa/psparser.py @@ -166,16 +166,18 @@ class PSFileParser: Parser (actually a lexer) for PDF data from a buffered file object. """ - def __init__(self, fp: BinaryIO): + def __init__(self, fp: BinaryIO) -> None: self.fp = fp self._tokens: Deque[Tuple[int, PSBaseParserToken]] = deque() self.seek(0) - def reinit(self, fp: BinaryIO): + def reinit(self, fp: BinaryIO) -> None: + """Reinitialize parser with a new file.""" self.fp = fp self.seek(0) def seek(self, pos: int) -> None: + """Seek to a position and reinitialize parser state.""" self.fp.seek(pos) self._parse1 = self._parse_main self._curtoken = b"" @@ -183,14 +185,18 @@ def seek(self, pos: int) -> None: self._tokens.clear() def tell(self) -> int: + """Get the current position in the file.""" return self.fp.tell() def read(self, pos: int, objlen: int) -> bytes: + """Read data from a specified position, moving the current + position to the end of this data.""" self.fp.seek(pos) return self.fp.read(objlen) def nextline(self) -> Tuple[int, bytes]: - r"""Fetches a next line that ends either with \r, \n, or \r\n.""" + r"""Fetches a next line that ends either with \r, \n, or + \r\n.""" linepos = self.fp.tell() # readline() is implemented on BinarIO so just use that # (except that it only accepts \n as a separator) @@ -294,10 +300,13 @@ def get_inline_data( data.append(buf) return (pos, b"".join(data)) - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[int, PSBaseParserToken]]: + """Iterate over tokens.""" return self def __next__(self) -> Tuple[int, PSBaseParserToken]: + """Get the next token in iteration, raising StopIteration when + done.""" while True: c = self._parse1() # print(c, self._curtoken, self._parse1) @@ -308,12 +317,13 @@ def __next__(self) -> Tuple[int, PSBaseParserToken]: return self._tokens.popleft() def nexttoken(self) -> Tuple[int, PSBaseParserToken]: + """Get the next token in iteration, raising PSEOF when done.""" try: return self.__next__() except StopIteration: raise PSEOF - def _parse_main(self): + def _parse_main(self) -> bytes: """Initial/default state for the lexer.""" c = self.fp.read(1) # note that b"" (EOF) is in everything, which is fine @@ -355,7 +365,7 @@ def _add_token(self, obj: PSBaseParserToken) -> None: """Add a succesfully parsed token.""" self._tokens.append((self._curtokenpos, obj)) - def _parse_comment(self): + def _parse_comment(self) -> bytes: """Comment state for the lexer""" c = self.fp.read(1) if c in EOL: # this includes b"", i.e. EOF @@ -366,7 +376,7 @@ def _parse_comment(self): self._curtoken += c return c - def _parse_literal(self): + def _parse_literal(self) -> bytes: """Literal (keyword) state for the lexer.""" c = self.fp.read(1) if c == b"#": @@ -384,7 +394,7 @@ def _parse_literal(self): self._curtoken += c return c - def _parse_literal_hex(self): + def _parse_literal_hex(self) -> bytes: """State for escaped hex characters in literal names""" # Consume a hex digit only if we can ... consume a hex digit c = self.fp.read(1) @@ -398,7 +408,7 @@ def _parse_literal_hex(self): self._parse1 = self._parse_literal return c - def _parse_number(self): + def _parse_number(self) -> bytes: """State for numeric objects.""" c = self.fp.read(1) if c and c in NUMBER: @@ -416,7 +426,7 @@ def _parse_number(self): self._parse1 = self._parse_main return c - def _parse_float(self): + def _parse_float(self) -> bytes: """State for fractional part of numeric objects.""" c = self.fp.read(1) # b"" is in everything so we have to add an extra check @@ -432,7 +442,7 @@ def _parse_float(self): self._curtoken += c return c - def _parse_keyword(self): + def _parse_keyword(self) -> bytes: """State for keywords.""" c = self.fp.read(1) if c in NOTKEYWORD: # includes EOF @@ -449,7 +459,7 @@ def _parse_keyword(self): self._curtoken += c return c - def _parse_string(self): + def _parse_string(self) -> bytes: """State for string objects.""" c = self.fp.read(1) if c and c in NOTSTRING: # does not include EOF @@ -484,7 +494,7 @@ def _parse_string(self): self._curtoken += c return c - def _parse_string_esc(self): + def _parse_string_esc(self) -> bytes: """State for escapes in literal strings. We have seen a backslash and nothing else.""" c = self.fp.read(1) @@ -509,7 +519,7 @@ def _parse_string_esc(self): self._parse1 = self._parse_string return c - def _parse_string_octal(self): + def _parse_string_octal(self) -> bytes: """State for an octal escape.""" c = self.fp.read(1) if c and c in OCTAL: # exclude EOF @@ -532,7 +542,7 @@ def _parse_string_octal(self): self._parse1 = self._parse_string return c - def _parse_wopen(self): + def _parse_wopen(self) -> bytes: """State for start of dictionary or hex string.""" c = self.fp.read(1) if c == b"<": @@ -544,7 +554,7 @@ def _parse_wopen(self): self._parse1 = self._parse_hexstring return c - def _parse_wclose(self): + def _parse_wclose(self) -> bytes: """State for end of dictionary (accessed from initial state only)""" c = self.fp.read(1) if c == b">": @@ -555,8 +565,9 @@ def _parse_wclose(self): if c: self.fp.seek(-1, io.SEEK_CUR) self._parse1 = self._parse_main + return c - def _parse_hexstring(self): + def _parse_hexstring(self) -> bytes: """State for parsing hexadecimal literal strings.""" c = self.fp.read(1) if not c: @@ -613,26 +624,31 @@ class PSInMemoryParser: Parser for in-memory data streams. """ - def __init__(self, data: bytes): + def __init__(self, data: bytes) -> None: self.data = data self.pos = 0 self.end = len(data) self._tokens: Deque[Tuple[int, PSBaseParserToken]] = deque() - def reinit(self, data: bytes): + def reinit(self, data: bytes) -> None: + """Reinitialize parser with a new buffer.""" self.data = data self.seek(0) def seek(self, pos: int) -> None: + """Seek to a position and reinitialize parser state.""" self.pos = pos self._curtoken = b"" self._curtokenpos = 0 self._tokens.clear() def tell(self) -> int: + """Get the current position in the buffer.""" return self.pos def read(self, pos: int, objlen: int) -> bytes: + """Read data from a specified position, moving the current + position to the end of this data.""" self.pos = max(pos + objlen, len(self.data)) return self.data[pos : self.pos] @@ -691,30 +707,31 @@ def get_inline_data( return result return (-1, b"") - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[int, PSBaseParserToken]]: + """Iterate over tokens.""" return self def nexttoken(self) -> Tuple[int, PSBaseParserToken]: + """Get the next token in iteration, raising PSEOF when done.""" try: return self.__next__() except StopIteration: raise PSEOF def __next__(self) -> Tuple[int, PSBaseParserToken]: - """Lexer (most of the work is done in regular expressions, but - PDF syntax is not entirely regular due to the use of balanced - parentheses in strings).""" + """Get the next token in iteration, raising StopIteration when + done.""" while True: m = LEXER.match(self.data, self.pos) if m is None: # can only happen at EOS raise StopIteration self._curtokenpos = m.start() self.pos = m.end() - if m.lastgroup not in ("whitespace", "comment"): + if m.lastgroup not in ("whitespace", "comment"): # type: ignore # Okay, we got a token or something break self._curtoken = m[0] - if m.lastgroup == "name": + if m.lastgroup == "name": # type: ignore self._curtoken = m[0][1:] self._curtoken = HEXDIGIT.sub( lambda x: bytes((int(x[1], 16),)), self._curtoken @@ -724,18 +741,18 @@ def __next__(self) -> Tuple[int, PSBaseParserToken]: except UnicodeDecodeError: tok = LIT(self._curtoken) return (self._curtokenpos, tok) - if m.lastgroup == "number": + if m.lastgroup == "number": # type: ignore if b"." in self._curtoken: return (self._curtokenpos, float(self._curtoken)) else: return (self._curtokenpos, int(self._curtoken)) - if m.lastgroup == "startdict": + if m.lastgroup == "startdict": # type: ignore return (self._curtokenpos, KEYWORD_DICT_BEGIN) - if m.lastgroup == "enddict": + if m.lastgroup == "enddict": # type: ignore return (self._curtokenpos, KEYWORD_DICT_END) - if m.lastgroup == "startstr": + if m.lastgroup == "startstr": # type: ignore return self._parse_endstr(self.data[m.start() + 1 : m.end()], m.end()) - if m.lastgroup == "hexstr": + if m.lastgroup == "hexstr": # type: ignore self._curtoken = SPC.sub(b"", self._curtoken[1:-1]) if len(self._curtoken) % 2 == 1: self._curtoken += b"0" @@ -754,23 +771,23 @@ def _parse_endstr(self, start: bytes, pos: int) -> Tuple[int, PSBaseParserToken] paren = 1 for m in STRLEXER.finditer(self.data, pos): self.pos = m.end() - if m.lastgroup == "parenright": + if m.lastgroup == "parenright": # type: ignore paren -= 1 if paren == 0: # By far the most common situation! break parts.append(m[0]) - elif m.lastgroup == "parenleft": + elif m.lastgroup == "parenleft": # type: ignore parts.append(m[0]) paren += 1 - elif m.lastgroup == "escape": + elif m.lastgroup == "escape": # type: ignore chr = m[0][1:2] if chr not in ESC_STRING: log.warning("Unrecognized escape %r", m[0]) parts.append(chr) else: parts.append(bytes((ESC_STRING[chr],))) - elif m.lastgroup == "octal": + elif m.lastgroup == "octal": # type: ignore chrcode = int(m[0][1:], 8) if chrcode >= 256: # PDF1.7 p.16: "high-order overflow shall be @@ -778,7 +795,7 @@ def _parse_endstr(self, start: bytes, pos: int) -> Tuple[int, PSBaseParserToken] log.warning("Invalid octal %r (%d)", m[0][1:], chrcode) else: parts.append(bytes((chrcode,))) - elif m.lastgroup == "linebreak": + elif m.lastgroup == "linebreak": # type: ignore pass else: parts.append(m[0]) @@ -800,10 +817,14 @@ def _parse_endstr(self, start: bytes, pos: int) -> Tuple[int, PSBaseParserToken] class PSStackParser(Generic[ExtraT]): + """Basic parser for PDF objects, can take a file or a `bytes` as + input.""" + def __init__(self, reader: Union[BinaryIO, bytes]) -> None: self.reinit(reader) def reinit(self, reader: Union[BinaryIO, bytes]) -> None: + """Reinitialize parser with a new file or buffer.""" if isinstance(reader, bytes): self._parser: Union[PSInMemoryParser, PSFileParser] = PSInMemoryParser( reader @@ -813,29 +834,35 @@ def reinit(self, reader: Union[BinaryIO, bytes]) -> None: self.reset() def reset(self) -> None: + """Reset parser state.""" self.context: List[Tuple[int, Optional[str], List[PSStackEntry[ExtraT]]]] = [] self.curtype: Optional[str] = None self.curstack: List[PSStackEntry[ExtraT]] = [] self.results: List[PSStackEntry[ExtraT]] = [] def seek(self, pos: int) -> None: + """Seek to a position and reset parser state.""" self._parser.seek(pos) self.reset() def push(self, *objs: PSStackEntry[ExtraT]) -> None: + """Push some objects onto the stack.""" self.curstack.extend(objs) def pop(self, n: int) -> List[PSStackEntry[ExtraT]]: + """Pop some objects off the stack.""" objs = self.curstack[-n:] self.curstack[-n:] = [] return objs def popall(self) -> List[PSStackEntry[ExtraT]]: + """Pop all the things off the stack.""" objs = self.curstack self.curstack = [] return objs def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: + """Move some objects to the output.""" try: log.debug("add_results: %r", objs) except Exception: @@ -843,11 +870,13 @@ def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: self.results.extend(objs) def start_type(self, pos: int, type: str) -> None: + """Start a composite object (array, dict, etc).""" self.context.append((pos, self.curtype, self.curstack)) (self.curtype, self.curstack) = (type, []) log.debug("start_type: pos=%r, type=%r", pos, type) def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]: + """End a composite object (array, dict, etc).""" if self.curtype != type: raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}") objs = [obj for (_, obj) in self.curstack] @@ -856,9 +885,11 @@ def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]: return (pos, objs) def do_keyword(self, pos: int, token: PSKeyword) -> None: + """Handle a PDF keyword.""" pass def flush(self) -> None: + """Get everything off the stack and into the output?""" pass def nextobject(self) -> PSStackEntry[ExtraT]: @@ -943,25 +974,39 @@ def nextobject(self) -> PSStackEntry[ExtraT]: # Delegation follows def nextline(self) -> Tuple[int, bytes]: + r"""Fetches a next line that ends either with \r, \n, or + \r\n.""" return self._parser.nextline() def revreadlines(self) -> Iterator[bytes]: + """Fetches a next line backwards. + + This is used to locate the trailers at the end of a file. + """ return self._parser.revreadlines() def read(self, pos: int, objlen: int) -> bytes: + """Read data from a specified position, moving the current + position to the end of this data.""" return self._parser.read(pos, objlen) def nexttoken(self) -> Tuple[int, PSBaseParserToken]: + """Get the next token in iteration, raising PSEOF when done.""" try: return self.__next__() except StopIteration: raise PSEOF def get_inline_data(self, target: bytes = b"EI") -> Tuple[int, bytes]: + """Get the data for an inline image up to the target + end-of-stream marker.""" return self._parser.get_inline_data(target) - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[int, PSBaseParserToken]]: + """Iterate over tokens.""" return self - def __next__(self): + def __next__(self) -> Tuple[int, PSBaseParserToken]: + """Get the next token in iteration, raising StopIteration when + done.""" return self._parser.__next__() diff --git a/tests/test_pdfparser.py b/tests/test_pdfparser.py deleted file mode 100644 index 109b553..0000000 --- a/tests/test_pdfparser.py +++ /dev/null @@ -1,249 +0,0 @@ -""" -Test the PDF parser -""" - -import tempfile - -from playa.exceptions import PSEOF -from playa.psparser import ( - KEYWORD_DICT_BEGIN, - KEYWORD_DICT_END, - KWD, - LIT, - PSFileParser, - PSInMemoryParser, -) - -TESTDATA = b""" -ugh -foo\r -bar\rbaz -quxx -bog""" -EXPECTED = [ - (0, b"\n"), - (1, b"ugh\n"), - (5, b"foo\r\n"), - (10, b"bar\r"), - (14, b"baz\n"), - (18, b"quxx\n"), - (23, b"bog"), -] - - -def run_parsers(data: bytes, expected: list, makefunc): - """Test stuff on both BytesIO and BinaryIO.""" - bp = PSInMemoryParser(data) - output = [] - func = makefunc(bp) - while True: - try: - output.append(func()) - except PSEOF: - break - assert output == expected - with tempfile.NamedTemporaryFile() as tf: - with open(tf.name, "wb") as outfh: - outfh.write(data) - with open(tf.name, "rb") as infh: - fp = PSFileParser(infh) - func = makefunc(fp) - output = [] - while True: - try: - output.append(func()) - except PSEOF: - break - assert output == expected - - -def test_nextline(): - """Verify that we replicate the old nextline method.""" - run_parsers(TESTDATA, EXPECTED, lambda foo: foo.nextline) - - -def test_revreadlines(): - """Verify that we replicate the old revreadlines method.""" - expected = list(reversed([line for pos, line in EXPECTED])) - - def make_next(parser): - itor = parser.revreadlines() - - def nextor(): - try: - line = next(itor) - except StopIteration: - raise PSEOF - return line - - return nextor - - run_parsers(TESTDATA, expected, make_next) - - -SIMPLE1 = b"""1 0 obj -<< - /Type /Catalog - /Outlines 2 0 R - /Pages 3 0 R ->> -endobj -""" -SIMPLETOK = [ - 1, - 0, - KWD(b"obj"), - KEYWORD_DICT_BEGIN, - LIT("Type"), - LIT("Catalog"), - LIT("Outlines"), - 2, - 0, - KWD(b"R"), - LIT("Pages"), - 3, - 0, - KWD(b"R"), - KEYWORD_DICT_END, - KWD(b"endobj"), -] - - -def list_parsers(data: bytes, expected: list, discard_pos=False): - bp = PSInMemoryParser(data) - if discard_pos: - tokens = [tok for pos, tok in list(bp)] - else: - tokens = list(bp) - assert tokens == expected - with tempfile.NamedTemporaryFile() as tf: - with open(tf.name, "wb") as outfh: - outfh.write(data) - with open(tf.name, "rb") as infh: - fp = PSFileParser(infh) - if discard_pos: - tokens = [tok for pos, tok in list(fp)] - else: - tokens = list(fp) - assert tokens == expected - - -def test_new_parser(): - # Do a lot of them to make sure buffering works correctly - list_parsers(SIMPLE1 * 100, SIMPLETOK * 100, discard_pos=True) - - -def test_new_parser_eof(): - # Make sure we get a keyword at eof - list_parsers(SIMPLE1[:-1], SIMPLETOK, discard_pos=True) - - -PAGE17 = b""" - /A;Name_With-Various***Characters? - /lime#20Green - /paired#28#29parentheses -""" - - -def test_new_parser1(): - list_parsers(b"123.456", [(0, 123.456)]) - list_parsers(b"+.013", [(0, 0.013)]) - list_parsers(b"123", [(0, 123)]) - list_parsers(b"true false", [(0, True), (5, False)]) - list_parsers(b"(foobie bletch)", [(0, b"foobie bletch")]) - list_parsers(b"(foo", []) - - -def test_new_parser_names(): - # Examples from PDF 1.7 page 17 - list_parsers( - PAGE17, - [ - (5, LIT("A;Name_With-Various***Characters?")), - (44, LIT("lime Green")), - (62, LIT("paired()parentheses")), - ], - ) - - -def test_new_parser_strings(): - list_parsers( - rb"( Strings may contain balanced parentheses ( ) and " - rb"special characters ( * ! & } ^ % and so on ) . )", - [ - ( - 0, - rb" Strings may contain balanced parentheses ( ) and " - rb"special characters ( * ! & } ^ % and so on ) . ", - ) - ], - ) - list_parsers(b"()", [(0, b"")]) - list_parsers( - rb"""( These \ -two strings \ -are the same . ) - """, - [(0, b" These two strings are the same . ")], - ) - list_parsers(b"(foo\rbar)", [(0, b"foo\nbar")]) - list_parsers(b"(foo\r)", [(0, b"foo\n")]) - list_parsers(b"(foo\r\nbaz)", [(0, b"foo\nbaz")]) - list_parsers(b"(foo\n)", [(0, b"foo\n")]) - list_parsers( - rb"( This string contains \245two octal characters\307 . )", - [(0, b" This string contains \245two octal characters\307 . ")], - ) - list_parsers(rb"(\0053 \053 \53)", [(0, b"\0053 \053 +")]) - list_parsers( - rb"< 4E6F762073686D6F7A206B6120706F702E >", [(0, b"Nov shmoz ka pop.")] - ) - list_parsers(rb"<73 686 D6F7A2>", [(0, b"shmoz ")]) - list_parsers(rb"(\400)", [(0, b"")]) - - -def test_invalid_strings_eof(): - list_parsers(rb"(\00", []) - list_parsers(rb"(abracadab", []) - - -def inline_parsers( - data: bytes, expected: tuple, target=b"EI", nexttoken=None, blocksize=16 -): - bp = PSInMemoryParser(data) - assert bp.get_inline_data(target=target, blocksize=blocksize) == expected - if nexttoken is not None: - assert bp.nexttoken() == nexttoken - with tempfile.NamedTemporaryFile() as tf: - with open(tf.name, "wb") as outfh: - outfh.write(data) - with open(tf.name, "rb") as infh: - fp = PSFileParser(infh) - assert fp.get_inline_data(target=target, blocksize=blocksize) == expected - if nexttoken is not None: - assert fp.nexttoken() == nexttoken - - -def test_get_inline_data(): - kwd_eio = KWD(b"EIO") - kwd_omg = KWD(b"OMG") - inline_parsers(b"""0123456789""", (-1, b"")) - inline_parsers(b"""0123456789EI""", (10, b"0123456789EI")) - inline_parsers( - b"""0123456789EIEIO""", (10, b"0123456789EI"), nexttoken=(12, kwd_eio) - ) - inline_parsers(b"""012EIEIO""", (3, b"012EI"), nexttoken=(5, kwd_eio), blocksize=4) - inline_parsers( - b"""0123012EIEIO""", (7, b"0123012EI"), nexttoken=(9, kwd_eio), blocksize=4 - ) - for blocksize in range(1, 8): - inline_parsers( - b"""012EIEIOOMG""", - ( - 3, - b"012EIEIO", - ), - target=b"EIEIO", - nexttoken=(8, kwd_omg), - blocksize=blocksize, - )