Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock and DELETE/UPDATE unindexed datasets #3620

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions lib/pbench/server/api/resources/query_apis/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ParamType,
)
from pbench.server.api.resources.query_apis import ElasticBase
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.datasets import Dataset
from pbench.server.database.models.index_map import IndexMap
from pbench.server.database.models.templates import Template

Expand Down Expand Up @@ -95,40 +95,36 @@ def get_index(
) -> str:
"""Retrieve ES indices based on a given root_index_name.

Datasets marked "archiveonly" aren't indexed, and can't be referenced
in most APIs that rely on Elasticsearch. Instead, we'll raise a
CONFLICT error.
Datasets without an index can't be referenced in most APIs that rely on
Elasticsearch. Instead, we'll raise a CONFLICT error. However, the
/api/v1/datasets API will specify ok_no_index as they need to operate
on the dataset regardless of whether indexing is enabled.
webbnh marked this conversation as resolved.
Show resolved Hide resolved

All indices are returned if root_index_name is omitted.

Args:
dataset: dataset object
root_index_name: A root index name like "run-data"
ok_no_index: Don't fail on an archiveonly dataset
ok_no_index: Don't fail if dataset has no indices

Raises:
APIAbort(CONFLICT) if indexing was disabled on the target dataset.
APIAbort(NOT_FOUND) if the dataset has no matching index data
APIAbort(NOT_FOUND) if index is required and the dataset has none

Returns:
A string that joins all selected indices with ",", suitable for use
in an Elasticsearch query URI.
"""

archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE)
if archive_only:
if ok_no_index:
return ""
raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled")

index_keys = list(IndexMap.indices(dataset, root_index_name))
index_keys = IndexMap.indices(dataset, root_index_name)
if index_keys:
return ",".join(index_keys)
if ok_no_index:
return ""

if not index_keys:
raise APIAbort(
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

indices = ",".join(index_keys)
return indices
raise APIAbort(
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

def get_aggregatable_fields(
self, mappings: JSON, prefix: AnyStr = "", result: Union[List, None] = None
Expand Down
23 changes: 9 additions & 14 deletions lib/pbench/server/api/resources/query_apis/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
#
# It's important that all context fields required for postprocessing
# of unindexed datasets have been set before this!
indices = self.get_index(dataset, ok_no_index=(action != "get"))
indices = self.get_index(dataset, ok_no_index=True)
context["indices"] = indices
if not indices:
return {}
Expand Down Expand Up @@ -234,7 +234,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
and some other data. We mostly want to determine whether it was
100% successful (before updating or deleting the dataset), but
we also summarize the results for the client.
* For get, we directly return the "hit list".
* For get, we return a count of documents for each index name.

Args:
es_json: the Elasticsearch response document
Expand All @@ -249,22 +249,17 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
current_app.logger.info("POSTPROCESS {}: {}", dataset.name, es_json)
failures = 0
if action == "get":
count = None
hits = []
try:
count = es_json["hits"]["total"]["value"]
hits = es_json["hits"]["hits"]
if int(count) == 0:
if es_json:
try:
hits = es_json["hits"]["hits"]
except KeyError as e:
raise APIInternalError(
f"Elasticsearch returned no matches for {dataset.name}"
f"Can't find search service match data for {dataset.name} ({e}) in {es_json!r}",
)
except KeyError as e:
raise APIInternalError(
f"Can't find Elasticsearch match data for {dataset.name} ({e}) in {es_json!r}",
)
except ValueError as e:
if not isinstance(hits, list):
raise APIInternalError(
f"Elasticsearch bad hit count {count!r} for {dataset.name}: {e}",
f"search service did not return hits list ({type(hits).__name__})"
)
results = defaultdict(int)
for hit in hits:
Expand Down
6 changes: 3 additions & 3 deletions lib/pbench/server/database/models/index_map.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator, NewType, Optional
from typing import NewType, Optional

from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.exc import SQLAlchemyError
Expand Down Expand Up @@ -186,7 +186,7 @@ def merge(cls, dataset: Dataset, merge_map: IndexMapType):
cls.commit(dataset, "merge")

@staticmethod
def indices(dataset: Dataset, root: Optional[str] = None) -> Iterator[str]:
def indices(dataset: Dataset, root: Optional[str] = None) -> list[str]:
"""Return the indices matching the specified root index name.

Args:
Expand All @@ -207,7 +207,7 @@ def indices(dataset: Dataset, root: Optional[str] = None) -> Iterator[str]:
except SQLAlchemyError as e:
raise IndexMapSqlError(e, operation="indices", dataset=dataset, name=root)

return (i.index for i in map)
return [str(i.index) for i in map]

@staticmethod
def exists(dataset: Dataset) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/test/unit/server/query_apis/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def query_api(
expected_index: str,
expected_status: str,
headers: Optional[dict] = None,
request_method=ApiMethod.POST,
request_method: ApiMethod = ApiMethod.POST,
query_params: Optional[JSONOBJECT] = None,
expect_call: Optional[bool] = None,
**kwargs,
Expand Down
Loading
Loading