Skip to content

Commit

Permalink
Python Wrapper: Simple readline (#7126)
Browse files Browse the repository at this point in the history
* Python Wrapper: Simple readline

* Fix tests

---------

Co-authored-by: guyhardonag <[email protected]>
  • Loading branch information
N-o-Z and guy-har authored Dec 8, 2023
1 parent 96dc40b commit 79c5316
Show file tree
Hide file tree
Showing 10 changed files with 10,682 additions and 28 deletions.
61 changes: 42 additions & 19 deletions clients/python-wrapper/lakefs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ForbiddenException,
PermissionException,
ObjectExistsException,
InvalidRangeException,
)
from lakefs.models import ObjectInfo

Expand Down Expand Up @@ -110,13 +111,13 @@ def readable(self) -> bool:

def readline(self, limit: int = -1):
"""
Currently unsupported
Must be explicitly implemented by inheriting class
"""
raise io.UnsupportedOperation

def readlines(self, hint: int = -1):
"""
Currently unsupported
Must be explicitly implemented by inheriting class
"""
raise io.UnsupportedOperation

Expand Down Expand Up @@ -145,16 +146,10 @@ def writelines(self, lines: List[AnyStr]) -> None:
raise io.UnsupportedOperation

def __next__(self) -> AnyStr:
"""
Unsupported by lakeFS implementation
"""
raise io.UnsupportedOperation
return self.readline()

def __iter__(self) -> Iterator[AnyStr]:
"""
Unsupported by lakeFS implementation
"""
raise io.UnsupportedOperation
return self

def __enter__(self) -> LakeFSIOBase:
return self
Expand Down Expand Up @@ -184,11 +179,15 @@ class ObjectReader(LakeFSIOBase):
This Object is instantiated and returned for immutable reference types (Commit, Tag...)
"""

_readlines_buf: io.BytesIO

def __init__(self, obj: StoredObject, mode: ReadModes, pre_sign: Optional[bool] = None,
client: Optional[Client] = None) -> None:
if mode not in get_args(ReadModes):
raise ValueError(f"invalid read mode: '{mode}'. ReadModes: {ReadModes}")

self._readlines_buf = io.BytesIO(b"")

super().__init__(obj, mode, pre_sign, client)

@property
Expand Down Expand Up @@ -248,6 +247,24 @@ def seek(self, offset: int, whence: int = 0) -> int:
self._pos = pos
return pos

def _cast_by_mode(self, retval):
if 'b' not in self.mode:
return retval.decode('utf-8')
return retval

def _read(self, read_range: str) -> str | bytes:
try:
with api_exception_handler(_io_exception_handler):
return self._client.sdk_client.objects_api.get_object(self._obj.repo,
self._obj.ref,
self._obj.path,
range=read_range,
presign=self.pre_sign)

except InvalidRangeException:
# This is done in order to behave like the built-in open() function
return b''

def read(self, n: int = None) -> str | bytes:
"""
Read object data
Expand All @@ -264,17 +281,23 @@ def read(self, n: int = None) -> str | bytes:
raise OSError("read_bytes must be a positive integer")

read_range = self._get_range_string(start=self._pos, read_bytes=n)
with api_exception_handler(_io_exception_handler):
contents = self._client.sdk_client.objects_api.get_object(self._obj.repo,
self._obj.ref,
self._obj.path,
range=read_range,
presign=self.pre_sign)
contents = self._read(read_range)
self._pos += len(contents) # Update pointer position
if 'b' not in self._mode:
return contents.decode('utf-8')

return contents
return self._cast_by_mode(contents)

def readline(self, limit: int = -1):
"""
Read and return a line from the stream.
:param limit: If limit > -1 returns at most limit bytes
"""
if self._readlines_buf.getbuffer().nbytes == 0:
self._readlines_buf = io.BytesIO(self._read(self._get_range_string(0)))
self._readlines_buf.seek(self._pos)
line = self._readlines_buf.readline(limit)
self._pos = self._readlines_buf.tell()
return self._cast_by_mode(line)

def flush(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion clients/python-wrapper/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ Sphinx~=7.2.6
sphinx-autodoc-typehints~=1.25.2
myst-parser~=2.0.0
pytest~=7.4.3

pytest-datafiles~=3.0.0

3 changes: 3 additions & 0 deletions clients/python-wrapper/setup.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[flake8]
max-line-length=120

[aliases]
test = pytest
2 changes: 1 addition & 1 deletion clients/python-wrapper/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
keywords=["OpenAPI", "OpenAPI-Generator", "lakeFS API", "Python Wrapper"],
python_requires=">=3.9",
install_requires=REQUIRES,
tests_require=["pytest ~= 7.4.3"],
tests_require=["pytest ~= 7.4.3", "pytest-datafiles ~= 3.0.0"],
packages=find_packages(exclude=["tests"]),
include_package_data=True,
license="Apache 2.0",
Expand Down
11 changes: 11 additions & 0 deletions clients/python-wrapper/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import re
import time
import uuid
from pathlib import Path

import pytest

from lakefs import client
Expand Down Expand Up @@ -60,3 +62,12 @@ def fixture_pre_sign(request):
if request.param and not clt.storage_config.pre_sign_support:
pytest.skip("Storage adapter does not support pre-sign mode")
return request.param


FIXTURE_DIR = Path(__file__).parent.parent.resolve() / 'test_files'
TEST_DATA = pytest.mark.datafiles(
FIXTURE_DIR / 'mock.csv',
FIXTURE_DIR / 'mock.json',
FIXTURE_DIR / 'mock.yaml',
FIXTURE_DIR / 'mock.xml',
)
161 changes: 154 additions & 7 deletions clients/python-wrapper/tests/integration/test_object.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import io
import csv
import json
import math
from xml.etree import ElementTree
from typing import get_args

import yaml
import pytest

from tests.integration.conftest import TEST_DATA
from tests.utests.common import expect_exception_context
from lakefs.exceptions import ObjectExistsException, InvalidRangeException, NotFoundException
from lakefs.exceptions import ObjectExistsException, NotFoundException
from lakefs.object import WriteableObject, WriteModes, ReadModes


Expand All @@ -23,16 +29,14 @@ def test_object_read_seek(setup_repo, pre_sign):

assert fd.read() == data[7:]

# This should raise an exception
with expect_exception_context(InvalidRangeException):
fd.read(1)
# This should return an empty string (simulates behavior of builtin open())
assert len(fd.read(1)) == 0

fd.seek(0)
for c in data:
assert ord(fd.read(1)) == c
# This should raise an exception
with expect_exception_context(InvalidRangeException):
fd.read(1)
# This should return an empty string (simulates behavior of builtin open())
assert len(fd.read(1)) == 0


def test_object_upload_exists(setup_repo):
Expand Down Expand Up @@ -159,3 +163,146 @@ def test_writer_different_params(setup_repo, w_mode, r_mode, pre_sign):
assert res.decode('utf-8') == expected
else:
assert res == expected


def _upload_file(repo, test_file):
obj = repo.branch("main").object("test_obj")

with open(test_file, "rb") as fd, obj.writer() as writer:
writer.write(fd.read())

return obj


def test_read_byte_by_byte(setup_repo):
clt, repo = setup_repo

data = b'test_data'
obj = WriteableObject(repository=repo.properties.id, reference="main", path="test_obj", client=clt).upload(
data=data, pre_sign=False)
res = b""
reader = obj.reader()
while True:
byte = reader.read(1)
if not byte:
break
res += byte

assert res == data


@TEST_DATA
def test_read_all(setup_repo, datafiles):
_, repo = setup_repo
test_file = datafiles / "mock.csv"
obj = _upload_file(repo, test_file)

with open(test_file, "r", encoding="utf-8") as fd:
data = fd.read()
read_data = obj.reader("r").read()
assert read_data == data


@TEST_DATA
def test_read_csv(setup_repo, datafiles):
_, repo = setup_repo
test_file = datafiles / "mock.csv"
obj = _upload_file(repo, test_file)

uploaded = csv.reader(obj.reader('r'))

with open(test_file, "r", encoding="utf-8") as fd:
source = csv.reader(fd)
for uploaded_row, source_row in zip(uploaded, source):
assert uploaded_row == source_row


@TEST_DATA
def test_read_json(setup_repo, datafiles):
_, repo = setup_repo
test_file = datafiles / "mock.json"
obj = _upload_file(repo, test_file)

with open(test_file, "r", encoding="utf-8") as fd:
source = json.load(fd)
uploaded = json.load(obj.reader(mode="r"))

assert uploaded == source


@TEST_DATA
def test_read_yaml(setup_repo, datafiles):
_, repo = setup_repo
test_file = datafiles / "mock.yaml"
obj = _upload_file(repo, test_file)

with open(test_file, "r", encoding="utf-8") as fd:
source = yaml.load(fd, Loader=yaml.Loader)
uploaded = yaml.load(obj.reader(mode="r"), Loader=yaml.Loader)
assert uploaded == source


@TEST_DATA
def test_read_xml(setup_repo, datafiles):
_, repo = setup_repo
test_file = datafiles / "mock.xml"
obj = _upload_file(repo, test_file)

uploaded = ElementTree.parse(obj.reader(mode="r"))
source = ElementTree.parse(test_file)
uploaded_root = uploaded.getroot()
source_root = source.getroot()
assert uploaded_root.tag == source_root.tag
assert uploaded_root.attrib == source_root.attrib
assert uploaded_root.text == source_root.text
assert uploaded_root.tail == source_root.tail

for uploaded_child, source_child in zip(uploaded_root, source_root):
for uploaded_child_child, source_child_child in zip(uploaded_child, source_child):
assert uploaded_child_child.tag == source_child_child.tag
assert uploaded_child_child.attrib == source_child_child.attrib
assert uploaded_child_child.text == source_child_child.text
assert uploaded_child_child.tail == source_child_child.tail


def test_readline_no_newline(setup_repo):
clt, repo = setup_repo
data = b'test_data'
obj = WriteableObject(repository=repo.properties.id, reference="main", path="test_obj", client=clt).upload(
data=data, pre_sign=False)

assert obj.reader().readline() == data

assert obj.reader().readline() == data


def test_readline_partial_line_buffer(setup_repo):
clt, repo = setup_repo
data = "a" * 15 + "\n" + "b" * 25 + "\n" + "That is all folks! "
obj = WriteableObject(repository=repo.properties.id, reference="main", path="test_obj", client=clt).upload(
data=data, pre_sign=False)

with obj.reader(mode="r") as reader:
reader.seek(5)
assert reader.readline() == "a" * 10 + "\n"
assert reader.readline() == "b" * 25 + "\n"
assert reader.readline() == "That is all folks! "
assert reader.readline() == ""
reader.seek(0)
assert reader.readline() == "a" * 15 + "\n"
assert reader.read() == data[16:]

# Read with limit
with obj.reader(mode="r") as reader:
for _ in range(math.ceil(len(data) / 10)):
end = 10
index = data.find("\n")
if -1 < index < 10:
end = index + 1

expected = data[:end]
data = data[len(expected):]
read = reader.readline(10)
assert read == expected

assert reader.read() == ""
Loading

0 comments on commit 79c5316

Please sign in to comment.