Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store.connect: fix force_reset kwarg implementations #879

Merged
merged 5 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions src/maggma/stores/compound_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,27 @@ def name(self) -> str:

def connect(self, force_reset: bool = False):
"""
Connects the underlying Mongo database and
all collection connections
Connects the underlying Mongo database and all collection connections

Args:
force_reset: whether to forcibly reset the connection
"""
conn: MongoClient = (
MongoClient(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
**self.mongoclient_kwargs,
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
if not self._coll or force_reset:
conn: MongoClient = (
MongoClient(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(self.host, self.port, **self.mongoclient_kwargs)
)
if self.username != ""
else MongoClient(self.host, self.port, **self.mongoclient_kwargs)
)
db = conn[self.database]
self._coll = db[self.main]
self._has_merge_objects = self._collection.database.client.server_info()["version"] > "3.6"
db = conn[self.database]
self._coll = db[self.main]
self._has_merge_objects = self._collection.database.client.server_info()["version"] > "3.6"

def close(self):
"""
Expand Down
10 changes: 7 additions & 3 deletions src/maggma/stores/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def _create_record_from_file(self, f: Path) -> Dict:
self.key: file_id,
}

def connect(self):
def connect(self, force_reset: bool = False):
"""
Connect to the source data

Expand All @@ -272,16 +272,20 @@ def connect(self):

If there is a metadata .json file in the directory, read its
contents into the MemoryStore

Args:
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
# read all files and place them in the MemoryStore
# use super.update to bypass the read_only guard statement
# because we want the file data to be populated in memory
super().connect()
super().connect(force_reset=force_reset)
super().update(self.read())

# now read any metadata from the .json file
try:
self.metadata_store.connect()
self.metadata_store.connect(force_reset=force_reset)
metadata = list(self.metadata_store.query())
except FileNotFoundError:
metadata = []
Expand Down
47 changes: 27 additions & 20 deletions src/maggma/stores/gridfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,29 +127,32 @@
def connect(self, force_reset: bool = False):
"""
Connect to the source data

Args:
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
if self.ssh_tunnel is None:
host = self.host
port = self.port
else:
self.ssh_tunnel.start()
host, port = self.ssh_tunnel.local_address

conn: MongoClient = (
MongoClient(
host=host,
port=port,
username=self.username,
password=self.password,
authSource=self.auth_source,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(host, port, **self.mongoclient_kwargs)
)
if not self._coll or force_reset:
if self.ssh_tunnel is None:
host = self.host
port = self.port
else:
self.ssh_tunnel.start()
host, port = self.ssh_tunnel.local_address

Check warning on line 141 in src/maggma/stores/gridfs.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/gridfs.py#L140-L141

Added lines #L140 - L141 were not covered by tests

conn: MongoClient = (
MongoClient(
host=host,
port=port,
username=self.username,
password=self.password,
authSource=self.auth_source,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(host, port, **self.mongoclient_kwargs)
)
db = conn[self.database]

self._coll = gridfs.GridFS(db, self.collection_name)
self._files_collection = db[f"{self.collection_name}.files"]
self._files_store = MongoStore.from_collection(self._files_collection)
Expand Down Expand Up @@ -483,6 +486,10 @@
def connect(self, force_reset: bool = False):
"""
Connect to the source data

Args:
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
if not self._coll or force_reset: # pragma: no cover
conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
Expand Down
73 changes: 45 additions & 28 deletions src/maggma/stores/mongolike.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def name(self) -> str:
def connect(self, force_reset: bool = False):
"""
Connect to the source data

Args:
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
if self._coll is None or force_reset:
if self.ssh_tunnel is None:
Expand Down Expand Up @@ -566,6 +570,10 @@ def name(self) -> str:
def connect(self, force_reset: bool = False):
"""
Connect to the source data

Args:
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
if self._coll is None or force_reset: # pragma: no cover
conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
Expand Down Expand Up @@ -594,8 +602,11 @@ def __init__(self, collection_name: str = "memory_db", **kwargs):
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""

Args:
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
if self._coll is None or force_reset:
self._coll = mongomock.MongoClient().db[self.name] # type: ignore

Expand Down Expand Up @@ -731,32 +742,37 @@ def __init__(

super().__init__(**kwargs)

def connect(self, force_reset=False):
def connect(self, force_reset: bool =False):
"""
Loads the files into the collection in memory
"""
super().connect(force_reset=force_reset)

# create the .json file if it does not exist
if not self.read_only and not Path(self.paths[0]).exists():
with zopen(self.paths[0], "w") as f:
data: List[dict] = []
bytesdata = orjson.dumps(data)
f.write(bytesdata.decode("utf-8"))
Args:
force_reset: whether to reset the connection or not. If False (default) and .connect() has been called previously, the .json file will not be read in again. This can improve performance
on systems with slow storage when multiple connect / disconnects are performed.
"""
if self._coll is None or force_reset:
self._coll = mongomock.MongoClient().db[self.name] # type: ignore

for path in self.paths:
objects = self.read_json_file(path)
try:
self.update(objects)
except KeyError:
raise KeyError(
f"""
Key field '{self.key}' not found in {path.name}. This
could mean that this JSONStore was initially created with a different key field.
The keys found in the .json file are {list(objects[0].keys())}. Try
re-initializing your JSONStore using one of these as the key arguments.
"""
)
# create the .json file if it does not exist
if not self.read_only and not Path(self.paths[0]).exists():
with zopen(self.paths[0], "w") as f:
data: List[dict] = []
bytesdata = orjson.dumps(data)
f.write(bytesdata.decode("utf-8"))

for path in self.paths:
objects = self.read_json_file(path)
try:
self.update(objects)
except KeyError:
raise KeyError(
f"""
Key field '{self.key}' not found in {path.name}. This
could mean that this JSONStore was initially created with a different key field.
The keys found in the .json file are {list(objects[0].keys())}. Try
re-initializing your JSONStore using one of these as the key arguments.
"""
)

def read_json_file(self, path) -> List:
"""
Expand Down Expand Up @@ -919,13 +935,14 @@ def connect(self, force_reset: bool = False):
Connect to the database store.

Args:
force_reset: Force connection reset.
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
# TODO - workaround, may be obviated by a future montydb update
if self.database_path != ":memory:":
set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
client = MontyClient(self.database_path, **self.client_kwargs)
if not self._coll or force_reset:
# TODO - workaround, may be obviated by a future montydb update
if self.database_path != ":memory:":
set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
client = MontyClient(self.database_path, **self.client_kwargs)
Fixed Show fixed Hide fixed
self._coll = client[self.database_name][self.collection_name]

@property
Expand Down
9 changes: 6 additions & 3 deletions src/maggma/stores/shared_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def connect(self, force_reset: bool = False):
Connect to the source data

Args:
force_reset: whether to reset the connection or not
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
self.multistore.connect(self.store, force_reset=force_reset)

Expand Down Expand Up @@ -352,7 +353,8 @@ def connect(self, store, force_reset: bool = False):

Args:
store: the store to connect to the source data
force_reset: whether to reset the connection or not
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
with self._multistore_lock:
store_id = self.get_store_index(store)
Expand All @@ -374,7 +376,8 @@ def connect_all(self, force_reset: bool = False):
Connects to all stores

Args:
force_reset: whether to reset the connection or not
force_reset: whether to reset the connection or not when the Store is
already connected.
"""
with self._multistore_lock:
for store in self._stores:
Expand Down