-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
292 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
class ConvertToWindowsNewlines: | ||
def __init__(self, sink): | ||
self.sink = sink | ||
|
||
def append(self, value, start_index=None, end_index=None): | ||
# If value is a single character | ||
if isinstance(value, str) and len(value) == 1: | ||
if value != "\n": | ||
self.sink.write(value) | ||
else: | ||
self.sink.write("\r\n") | ||
# If value is a CharSequence (in Python, a str) | ||
elif isinstance(value, str): | ||
# If start_index and end_index are provided, use the slice of the string | ||
if start_index is not None and end_index is not None: | ||
value_to_append = value[start_index:end_index] | ||
else: | ||
value_to_append = value | ||
self.sink.write(value_to_append.replace("\n", "\r\n")) | ||
return self |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
from .SnapshotValue import SnapshotValue | ||
from collections import OrderedDict | ||
|
||
|
||
class Snapshot: | ||
def __init__(self, subject, facet_data): | ||
self._subject = subject | ||
self._facet_data = facet_data | ||
|
||
@property | ||
def facets(self): | ||
return OrderedDict(sorted(self._facet_data.items())) | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, Snapshot): | ||
return NotImplemented | ||
return self._subject == other._subject and self._facet_data == other._facet_data | ||
|
||
def __hash__(self): | ||
return hash((self._subject, frozenset(self._facet_data.items()))) | ||
|
||
def plus_facet(self, key, value): | ||
if isinstance(value, bytes): | ||
value = SnapshotValue.of(value) | ||
elif isinstance(value, str): | ||
value = SnapshotValue.of(value) | ||
return self._plus_facet(key, value) | ||
|
||
def _plus_facet(self, key, value): | ||
if not key: | ||
raise ValueError("The empty string is reserved for the subject.") | ||
facet_data = dict(self._facet_data) | ||
facet_data[self._unix_newlines(key)] = value | ||
return Snapshot(self._subject, facet_data) | ||
|
||
def plus_or_replace(self, key, value): | ||
if not key: | ||
return Snapshot(value, self._facet_data) | ||
facet_data = dict(self._facet_data) | ||
facet_data[self._unix_newlines(key)] = value | ||
return Snapshot(self._subject, facet_data) | ||
|
||
def subject_or_facet_maybe(self, key): | ||
if not key: | ||
return self._subject | ||
return self._facet_data.get(key) | ||
|
||
def subject_or_facet(self, key): | ||
value = self.subject_or_facet_maybe(key) | ||
if value is None: | ||
raise KeyError(f"'{key}' not found in {list(self._facet_data.keys())}") | ||
return value | ||
|
||
def all_entries(self): | ||
entries = [("", self._subject)] | ||
entries.extend(self._facet_data.items()) | ||
return entries | ||
|
||
def __bytes__(self): | ||
return f"[{self._subject} {self._facet_data}]" | ||
|
||
@staticmethod | ||
def of(data): | ||
if isinstance(data, bytes): | ||
# Handling binary data | ||
return Snapshot(SnapshotValue.of(data), {}) | ||
elif isinstance(data, str): | ||
# Handling string data | ||
return Snapshot(SnapshotValue.of(data), {}) | ||
elif isinstance(data, SnapshotValue): | ||
return Snapshot(data, {}) | ||
else: | ||
raise TypeError("Data must be either binary or string" + data) | ||
|
||
@staticmethod | ||
def of_entries(entries): | ||
subject = None | ||
facet_data = {} | ||
for key, value in entries: | ||
if not key: | ||
if subject is not None: | ||
raise ValueError( | ||
f"Duplicate root snapshot.\n first: {subject}\nsecond: {value}" | ||
) | ||
subject = value | ||
else: | ||
facet_data[key] = value | ||
if subject is None: | ||
subject = SnapshotValue.of("") | ||
return Snapshot(subject, facet_data) | ||
|
||
@staticmethod | ||
def _unix_newlines(string): | ||
return string.replace("\\r\\n", "\\n") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from .Snapshot import Snapshot | ||
|
||
|
||
class SnapshotReader: | ||
def __init__(self, value_reader): | ||
self.value_reader = value_reader | ||
|
||
def peek_key(self): | ||
next_key = self.value_reader.peek_key() | ||
if next_key is None or next_key == "[end of file]": | ||
return None | ||
if "[" in next_key: | ||
raise ValueError( | ||
f"Missing root snapshot, square brackets not allowed: '{next_key}'" | ||
) | ||
return next_key | ||
|
||
def next_snapshot(self): | ||
root_name = self.peek_key() | ||
snapshot = Snapshot.of(self.value_reader.next_value()) | ||
while True: | ||
next_key = self.value_reader.peek_key() | ||
if next_key is None: | ||
return snapshot | ||
facet_idx = next_key.find("[") | ||
if facet_idx == -1 or (facet_idx == 0 and next_key == "[end of file]"): | ||
return snapshot | ||
facet_root = next_key[:facet_idx] | ||
if facet_root != root_name: | ||
raise ValueError( | ||
f"Expected '{next_key}' to come after '{facet_root}', not '{root_name}'" | ||
) | ||
facet_end_idx = next_key.find("]", facet_idx + 1) | ||
if facet_end_idx == -1: | ||
raise ValueError(f"Missing ] in {next_key}") | ||
facet_name = next_key[facet_idx + 1 : facet_end_idx] | ||
snapshot = snapshot.plus_facet(facet_name, self.value_reader.next_value()) | ||
|
||
def skip_snapshot(self): | ||
root_name = self.peek_key() | ||
if root_name is None: | ||
raise ValueError("No snapshot to skip") | ||
self.value_reader.skip_value() | ||
while True: | ||
next_key = self.peek_key() | ||
if next_key is None or not next_key.startswith(f"{root_name}["): | ||
break | ||
self.value_reader.skip_value() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from abc import ABC, abstractmethod | ||
|
||
|
||
def unix_newlines(string: str) -> str: | ||
return string.replace("\r\n", "\n") | ||
|
||
|
||
class SnapshotValue(ABC): | ||
@property | ||
def is_binary(self) -> bool: | ||
return isinstance(self, SnapshotValueBinary) | ||
|
||
@abstractmethod | ||
def value_binary(self) -> bytes: | ||
pass | ||
|
||
@abstractmethod | ||
def value_string(self) -> str: | ||
pass | ||
|
||
@staticmethod | ||
def of(data): | ||
if isinstance(data, bytes): | ||
return SnapshotValueBinary(data) | ||
elif isinstance(data, str): | ||
return SnapshotValueString(data) | ||
elif isinstance(data, SnapshotValue): | ||
return data | ||
else: | ||
raise TypeError("Unsupported type for Snapshot creation") | ||
|
||
|
||
class SnapshotValueBinary(SnapshotValue): | ||
def __init__(self, value: bytes): | ||
self._value = value | ||
|
||
def value_binary(self) -> bytes: | ||
return self._value | ||
|
||
def value_string(self) -> str: | ||
raise NotImplementedError("This is a binary value.") | ||
|
||
def __eq__(self, other): | ||
if isinstance(other, SnapshotValueBinary): | ||
return self.value_binary() == other.value_binary() | ||
return False | ||
|
||
def __hash__(self): | ||
return hash(self._value) | ||
|
||
|
||
class SnapshotValueString(SnapshotValue): | ||
def __init__(self, value: str): | ||
self._value = value | ||
|
||
def value_binary(self) -> bytes: | ||
raise NotImplementedError("This is a string value.") | ||
|
||
def value_string(self) -> str: | ||
return self._value | ||
|
||
def __eq__(self, other): | ||
if isinstance(other, SnapshotValueString): | ||
return self.value_string() == other.value_string() | ||
return False | ||
|
||
def __hash__(self): | ||
return hash(self._value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from base64 import b64decode | ||
from selfie_lib import SnapshotValueReader, Snapshot, SnapshotReader | ||
|
||
|
||
class TestSnapshotReader: | ||
def test_facet(self): | ||
reader = SnapshotReader( | ||
SnapshotValueReader.of( | ||
""" | ||
╔═ Apple ═╗ | ||
Apple | ||
╔═ Apple[color] ═╗ | ||
green | ||
╔═ Apple[crisp] ═╗ | ||
yes | ||
╔═ Orange ═╗ | ||
Orange | ||
""".strip() | ||
) | ||
) | ||
assert reader.peek_key() == "Apple" | ||
assert reader.peek_key() == "Apple" | ||
apple_snapshot = ( | ||
Snapshot.of("Apple").plus_facet("color", "green").plus_facet("crisp", "yes") | ||
) | ||
assert reader.next_snapshot() == apple_snapshot | ||
assert reader.peek_key() == "Orange" | ||
assert reader.peek_key() == "Orange" | ||
assert reader.next_snapshot() == Snapshot.of("Orange") | ||
assert reader.peek_key() is None | ||
|
||
def test_binary(self): | ||
reader = SnapshotReader( | ||
SnapshotValueReader.of( | ||
""" | ||
╔═ Apple ═╗ | ||
Apple | ||
╔═ Apple[color] ═╗ base64 length 3 bytes | ||
c2Fk | ||
╔═ Apple[crisp] ═╗ | ||
yes | ||
╔═ Orange ═╗ base64 length 3 bytes | ||
c2Fk | ||
""".strip() | ||
) | ||
) | ||
assert reader.peek_key() == "Apple" | ||
assert reader.peek_key() == "Apple" | ||
apple_snapshot = ( | ||
Snapshot.of("Apple") | ||
.plus_facet("color", b64decode("c2Fk")) | ||
.plus_facet("crisp", "yes") | ||
) | ||
assert reader.next_snapshot() == apple_snapshot | ||
assert reader.peek_key() == "Orange" | ||
assert reader.peek_key() == "Orange" | ||
assert reader.next_snapshot() == Snapshot.of(b64decode("c2Fk")) | ||
assert reader.peek_key() is None |