Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Wrapper: Make ImportManager single use #7108

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions clients/python-wrapper/lakefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@
Allow importing of models from package root
"""

from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.repository import Repository
from lakefs.reference import Reference
from lakefs.models import Commit, Change, RepositoryProperties
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.object import StoredObject, WriteableObject, ObjectReader
from lakefs.object_manager import ObjectManager


def repository(repository_id: str) -> Repository:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating these is probably out of scope for this pr. But doing so manually is a recipe for partial implementation, we're bound to miss some top level class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the comment.
We are doing this only for the Repository class, and just to align the syntax

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is twofold:

  1. This change does not belong on this PR: it could be split off entirely from it. If you did that you would speed up acceptance of at least one of the two smaller PRs.
  2. This is a partial change that is even more confusing than providing lowercase versions of all constructors. After all, you don't do the same for any of the other imported constructors. If you're going to do it, do it automatically and for all connstructors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I agree - too small PRs are also inefficient
  2. As stated before this is not a partial change, this is the change we agreed on. We certain do not want a lowercase version for all constructors, just for the root class which is the repository so that the code will look like this:
lakefs.repository("repo-name").branch("branch-name")

instead of

lakefs.Repository("repo-name").branch("branch-name")

Other classes are dependant on the context of their "parent" class we expect they should be initialized usually by the call to the respective method of the parent.
If it was up to me I wouldn't have done even the first - but we got some comments about it from users

"""
Wrapper for getting a Repository object from the lakefs module.
Enable more fluid syntax (lakefs.repository("x").branch("y") instead of lakefs.Repository("x").branch("y"))

:param repository_id: The repository name
:return: Repository object representing a lakeFS repository with the give repository_id
"""
return Repository(repository_id)
2 changes: 1 addition & 1 deletion clients/python-wrapper/lakefs/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def uncommitted(self, max_amount: Optional[int], after: Optional[str] = None, pr
**kwargs):
yield Change(**diff.dict())

def import_data(self, commit_message: str, metadata: Optional[dict] = None) -> ImportManager:
def import_data(self, commit_message: Optional[str] = "", metadata: Optional[dict] = None) -> ImportManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's optional it probably shouldn't be an empty string by default.

Copy link
Member Author

@N-o-Z N-o-Z Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's optional in the SDK, it's a required param in the auto-generated code - empty string means to use default server commit message for import

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment make me suddenly uncertain: What happens if I pass an empty commit_message? What happens if I pass commit_message=None? This needs to be in the docstring: people who use the high-level SDK should not need to refer to API documentation to extract such details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing commit_message=None will result in an error that the parameter is missing. Adding this to the docstring

"""
Import data to lakeFS

Expand Down
2 changes: 2 additions & 0 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def start(self) -> str:
"""
if self._in_progress:
raise ImportManagerException("Import in progress")
if self._import_id is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a single-use object used line this, why isn't it a function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several reasons:

  1. Allowing async and sync execution requires maintaining a state
  2. For code fluidity we want to be able to chain additional sources one after the other with a simple interface instead of requiring the user to build a "structure" of the data they want to import.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the Pythonic way (and also in many other cases) of keeping state for async execution is to return a state object that holds the coro, as for instance in asyncio.create_task does. In fact an import could be a Task, and I imagine will end up at least duck-typing like a Task.

A good way to achieve fluidity but only before calling a "start" method is with the builder pattern: you build a descriptor for the executor, and call builder.something(...).add_another_thing(...).start(). That returns a description of whatever is being performed.

AFAICT each important method of your single class can either only be called before execution of after execution. That's why ImportManager needs a run-time state. If we split ImportManager into, say, ImportBuilder and ImportTask (names TBD), then each method would go into a exactly one of these classes. This is OO "proof" that these are two distinct objects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid point and we can perhaps refactor this in the future. I'm deferring it because it's requires changes in the implementation

raise ImportManagerException("Import Manager can only be used once")

creation = lakefs_sdk.ImportCreation(paths=self.sources,
commit=lakefs_sdk.CommitCreation(message=self.commit_message,
Expand Down
25 changes: 22 additions & 3 deletions clients/python-wrapper/tests/integration/test_import.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from time import sleep

import pytest

from lakefs import Client
from lakefs.exceptions import ImportManagerException, ConflictException
from tests.utests.common import expect_exception_context

Expand All @@ -14,8 +17,14 @@
"nested/prefix-7/file000101", ]


def skip_on_unsupported_blockstore(clt: Client, supported_blockstores: [str]):
if clt.storage_config.blockstore_type not in supported_blockstores:
pytest.skip(f"Unsupported blockstore type for test: {clt.storage_config.blockstore_type}")


def test_import_manager(setup_repo):
_, repo = setup_repo
clt, repo = setup_repo
skip_on_unsupported_blockstore(clt, "s3")
branch = repo.branch("import-branch").create("main")
mgr = branch.import_data(commit_message="my imported data", metadata={"foo": "bar"})

Expand All @@ -32,17 +41,22 @@ def test_import_manager(setup_repo):
assert res.commit.metadata.get("foo") == "bar"
assert res.ingested_objects == 0

# Expect failure trying to run manager twice
with expect_exception_context(ImportManagerException):
mgr.run()
Comment on lines +45 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Import with objects and prefixes
mgr = branch.import_data()
dest_prefix = "imported/new-prefix/"
mgr.prefix(_IMPORT_PATH + "prefix-1/",
dest_prefix + "prefix-1/").prefix(_IMPORT_PATH + "prefix-2/",
dest_prefix + "prefix-2/")
for o in _FILES_TO_CHECK:
mgr.object(_IMPORT_PATH + o, dest_prefix + o)

mgr.commit_message = "new commit"
mgr.commit_metadata = None
res = mgr.run()

assert res.error is None
assert res.completed
assert res.commit.id == branch.commit_id()
Expand All @@ -56,7 +70,8 @@ def test_import_manager(setup_repo):


def test_import_manager_cancel(setup_repo):
_, repo = setup_repo
clt, repo = setup_repo
skip_on_unsupported_blockstore(clt, "s3")
branch = repo.branch("import-branch").create("main")
expected_commit_id = branch.commit_id()
expected_commit_message = branch.commit_message()
Expand All @@ -66,6 +81,10 @@ def test_import_manager_cancel(setup_repo):

mgr.start()
sleep(1)

with expect_exception_context(ImportManagerException):
mgr.start()

mgr.cancel()

status = mgr.status()
Expand Down
13 changes: 7 additions & 6 deletions clients/python-wrapper/tests/integration/test_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

from tests.utests.common import expect_exception_context
from lakefs.exceptions import NotFoundException, ConflictException, ObjectNotFoundException
from lakefs import RepositoryProperties, WriteableObject
import lakefs


def test_repository_sanity(storage_namespace, setup_repo):
_, repo = setup_repo
repo = lakefs.repository(repo.properties.id) # test the lakefs.repository function works properly
default_branch = "main"
expected_properties = RepositoryProperties(id=repo.properties.id,
default_branch=default_branch,
storage_namespace=storage_namespace,
creation_date=repo.properties.creation_date)
expected_properties = lakefs.RepositoryProperties(id=repo.properties.id,
default_branch=default_branch,
storage_namespace=storage_namespace,
creation_date=repo.properties.creation_date)
assert repo.properties == expected_properties

# Create with allow exists
Expand Down Expand Up @@ -103,7 +104,7 @@ def test_object_sanity(setup_repo):
data = b"test_data"
path = "test_obj"
metadata = {"foo": "bar"}
obj = WriteableObject(repository=repo.properties.id, reference="main", path=path, client=clt).upload(
obj = lakefs.WriteableObject(repository=repo.properties.id, reference="main", path=path, client=clt).upload(
data=data, metadata=metadata)
with obj.reader() as fd:
assert fd.read() == data
Expand Down
4 changes: 2 additions & 2 deletions clients/python-wrapper/tests/utests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def env_var_context():
os.environ = old_env


def get_test_repo() -> lakefs.repository.Repository:
def get_test_repo() -> lakefs.Repository:
from lakefs.client import Client
client = Client(username="test_user", password="test_password", host="http://127.0.0.1:8000")
return lakefs.repository.Repository(repository_id=TEST_REPO_ARGS.name, client=client)
return lakefs.Repository(repository_id=TEST_REPO_ARGS.name, client=client)


@contextmanager
Expand Down
Loading