-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
130 additions
and
123 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import itertools | ||
from collections.abc import Iterator | ||
from typing import List | ||
import re | ||
|
||
import ftfy | ||
|
||
UNDEFINED = re.compile("[\x8d\x81\x8f\x90\x9d]") | ||
CONTROL_CHARACTERS = re.compile( | ||
"[\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0d\x0e\x0f\x10\x11\x12\x13\x14\x15\x16" | ||
+ "\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f]" | ||
) | ||
WARNING_CHARS = "À˜â¿" | ||
|
||
|
||
def clean(s: str) -> str: | ||
"""Strip string, fix encoding, and remove undefined or control characters""" | ||
# This makes no sense - /u007f is the delete control character | ||
# https://www.ascii-code.com/grid | ||
# But SimaPro uses this as a linebreak inside a CSV line | ||
# This is why we can't have nice things | ||
# olca-simapro-csv does the same thing: | ||
# https://github.com/GreenDelta/olca-simapro-csv/blob/c11e40e7722f2ecaf62e813eebcc8d0793c8c3ff/src/test/java/org/openlca/simapro/csv/CsvLineTest.java#L53 | ||
s = s.replace("\x7f", "\n") | ||
s = UNDEFINED.sub("", s) | ||
s = CONTROL_CHARACTERS.sub("", s) | ||
if any(char in s for char in WARNING_CHARS): | ||
s = ftfy.fix_text(s) | ||
return s.strip() | ||
|
||
|
||
class BeKindRewind(Iterator): | ||
"""CSV reader which acts as a line by line iterator but which allows for one step backwards. | ||
Needed because the file we are consuming will sometimes indicate that a logical block is | ||
finished by using the control word `End`, but other times won't. In that case, our iterator | ||
is already in a new block. To make it simple to pass the iterator to the next function | ||
consuming the new block, we rewind it one line. | ||
Internally this is implemented by caching the last line read, and using `itertools.chain` | ||
when needed to prepend the cached line to the iterator. | ||
Parameters | ||
---------- | ||
data_iterable : collections.abc.Iterator | ||
Iterator which returns lists of strings. | ||
clean_elements : bool, optional | ||
Do `[clean(elem) for elem in line]` when returning a new line | ||
""" | ||
|
||
def __init__(self, data_iterable: Iterator, clean_elements: bool = True, offset: int = 0): | ||
self.data_iterable = data_iterable | ||
self.current = None | ||
self.clean_elements = clean_elements | ||
# Line numbers are 1-indexed | ||
self.line_no = offset + 1 | ||
|
||
def __next__(self) -> List[str]: | ||
self.current = next(self.data_iterable) | ||
self.line_no += 1 | ||
if self.clean_elements: | ||
self.current = [clean(elem) for elem in self.current] | ||
return self.current | ||
|
||
def rewind(self) -> None: | ||
"""Rewinds the iterator by one step, retrieving the element that was | ||
just returned by the previous call to `__next__`.""" | ||
self.line_no -= 1 | ||
if self.current is None: | ||
return | ||
self.data_iterable = itertools.chain((self.current,), self.data_iterable) | ||
self.current = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import pytest | ||
|
||
from bw_simapro_csv.csv_reader import ( | ||
BeKindRewind, | ||
clean, | ||
) | ||
|
||
|
||
def test_rewindable_generator(): | ||
a = iter((1, 2, 3, 4, 5)) | ||
r = BeKindRewind(a, clean_elements=False) | ||
assert next(r) == 1 | ||
assert next(r) == 2 | ||
assert next(r) == 3 | ||
r.rewind() | ||
assert next(r) == 3 | ||
assert next(r) == 4 | ||
assert next(r) == 5 | ||
with pytest.raises(StopIteration): | ||
next(r) | ||
|
||
|
||
def test_rewindable_generator_idempotent(): | ||
a = iter((1, 2, 3, 4, 5)) | ||
r = BeKindRewind(a, clean_elements=False) | ||
assert next(r) == 1 | ||
r.rewind() | ||
r.rewind() | ||
r.rewind() | ||
assert next(r) == 1 | ||
assert next(r) == 2 | ||
|
||
|
||
def test_rewindable_generator_rewind_before_iteration(): | ||
a = iter((1, 2, 3, 4, 5)) | ||
r = BeKindRewind(a, clean_elements=False) | ||
r.rewind() | ||
assert next(r) == 1 | ||
assert next(r) == 2 | ||
|
||
|
||
def test_rewindable_generator_strip(): | ||
a = iter([(" a ", "\tb ", "c"), (" 2", "1 ", "3")]) | ||
r = BeKindRewind(a) | ||
assert next(r) == ["a", "b", "c"] | ||
assert next(r) == ["2", "1", "3"] | ||
|
||
|
||
def test_clean(): | ||
assert clean("ï¾µg") == "ᄉg" | ||
assert clean(" \t foo") == "foo" | ||
assert clean(" \t foo") == "foo" | ||
assert clean("Â\x8dg") == "Âg" | ||
assert clean("CO2\x1a") == "CO2" | ||
assert clean("CO2") == "CO\n2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters