Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock and DELETE/UPDATE unindexed datasets #3620

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 70 additions & 36 deletions lib/pbench/server/api/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
MetadataBadKey,
MetadataError,
OperationName,
OperationState,
)
from pbench.server.database.models.server_settings import ServerSetting
from pbench.server.database.models.users import User
from pbench.server.sync import Sync


class APIAbort(Exception):
Expand Down Expand Up @@ -1419,6 +1421,51 @@ class UriBase:
host_value: str


@dataclass
class AuditContext:
"""Manage API audit context"""

audit: Optional[Audit] = None
webbnh marked this conversation as resolved.
Show resolved Hide resolved
finalize: bool = True
status: AuditStatus = AuditStatus.SUCCESS
reason: Optional[AuditReason] = None
attributes: Optional[JSONOBJECT] = None

def add_attribute(self, key: str, value: Any):
"""Add a single audit attribute

Args:
key: key name
value: key value
"""
if self.attributes is None:
self.attributes = {key: value}
else:
self.attributes[key] = value
webbnh marked this conversation as resolved.
Show resolved Hide resolved

def add_attributes(self, attr: JSONOBJECT):
"""Add multiple audit attributes as a JSON dict

Args:
attr: a JSON dict
"""
if self.attributes is None:
self.attributes = attr
else:
self.attributes.update(attr)

def set_error(self, error: str, reason: Optional[AuditReason] = None):
"""Set an audit error

Args:
error: error string
reason: audit failure reason
"""
self.add_attribute("error", error)
self.status = AuditStatus.FAILURE
self.reason = reason


class ApiBase(Resource):
"""A base class for Pbench queries that provides common parameter handling
behavior for specialized subclasses.
Expand Down Expand Up @@ -2031,67 +2078,54 @@ def _dispatch(
# wants to emit a special audit sequence it can disable "finalize"
# in the context. It can also pass "attributes" by setting that
# field.
auditing = {
"audit": audit,
"finalize": bool(audit),
webbnh marked this conversation as resolved.
Show resolved Hide resolved
"status": AuditStatus.SUCCESS,
"reason": None,
"attributes": None,
}

auditing = AuditContext(audit=audit)
context = {
"auditing": auditing,
"attributes": schema.attributes,
"raw_params": raw_params,
"sync": None,
}

response = None
sync_message = None
try:
response = execute(params, request, context)
except APIInternalError as e:
current_app.logger.exception("{} {}", api_name, e.details)
auditing.set_error(str(e), AuditReason.INTERNAL)
sync_message = str(e)
abort(e.http_status, message=str(e))
except APIAbort as e:
current_app.logger.warning(
"{} client error {}: '{}'", api_name, e.http_status, e
)
if auditing["finalize"]:
attr = auditing.get("attributes", {"message": str(e)})
try:
Audit.create(
root=auditing["audit"],
status=AuditStatus.FAILURE,
reason=auditing["reason"],
attributes=attr,
)
except Exception:
current_app.logger.error(
"Unexpected exception on audit: {}", auditing
)
auditing.set_error(str(e))
sync_message = str(e)
abort(e.http_status, message=str(e), **e.kwargs)
except Exception as e:
x = APIInternalError("Unexpected exception")
x.__cause__ = e
current_app.logger.exception(
"Exception {} API error: {}: {!r}", api_name, x, auditing
)
if auditing["finalize"]:
attr = auditing.get("attributes", {})
attr["message"] = str(e)
auditing.set_error(str(e), AuditReason.INTERNAL)
sync_message = str(e)
abort(x.http_status, message=x.message)
webbnh marked this conversation as resolved.
Show resolved Hide resolved
finally:
# If the operation created a Sync object, it will have been updated
# and removed unless the operation failed. This means we're here
# because of an exception, and one of the handlers has set an
# appropriate message to record in the operations table.
sync: Optional[Sync] = context.get("sync")
if sync:
sync.update(dataset, OperationState.FAILED, message=sync_message)
if auditing.audit and auditing.finalize:
Audit.create(
root=auditing["audit"],
status=AuditStatus.FAILURE,
reason=AuditReason.INTERNAL,
attributes=attr,
root=auditing.audit,
status=auditing.status,
reason=auditing.reason,
attributes=auditing.attributes,
)
abort(x.http_status, message=x.message)
if auditing["finalize"]:
Audit.create(
root=auditing["audit"],
status=auditing["status"],
reason=auditing["reason"],
attributes=auditing["attributes"],
)
return response

def _get(self, args: ApiParams, req: Request, context: ApiContext) -> Response:
Expand Down
7 changes: 4 additions & 3 deletions lib/pbench/server/api/resources/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ def _post(self, params: ApiParams, req: Request, context: ApiContext) -> Respons
status = HTTPStatus.OK
except Exception as e:
raise APIInternalError(str(e)) from e
context["auditing"]["attributes"] = key.as_json()
response = jsonify(key.as_json())
result = key.as_json()
context["auditing"].add_attributes(result)
response = jsonify(result)
response.status_code = status
return response

Expand Down Expand Up @@ -162,7 +163,7 @@ def _delete(self, params: ApiParams, req: Request, context: ApiContext) -> Respo
raise APIAbort(HTTPStatus.NOT_FOUND, "Requested key not found")
key = keys[0]
try:
context["auditing"]["attributes"] = key.as_json()
context["auditing"].add_attributes(key.as_json())
key.delete()
return "deleted", HTTPStatus.OK
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/server/api/resources/datasets_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _put(self, params: ApiParams, req: Request, context: ApiContext) -> Response
dataset = params.uri["dataset"]
metadata = params.body["metadata"]

context["auditing"]["attributes"] = {"updated": metadata}
context["auditing"].add_attribute("updated", metadata)
webbnh marked this conversation as resolved.
Show resolved Hide resolved

# Validate the authenticated user's authorization for the combination
# of "owner" and "access".
Expand Down
43 changes: 22 additions & 21 deletions lib/pbench/server/api/resources/query_apis/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ApiMethod,
ApiParams,
ApiSchema,
AuditContext,
MissingParameters,
Parameter,
ParamType,
Expand Down Expand Up @@ -121,9 +122,9 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
"""

dataset = context["dataset"]
auditing: AuditContext = context["auditing"]
action = context["attributes"].action
context["action"] = action
audit_attributes = {}
access = None
owner = None
elastic_options = {"ignore_unavailable": "true"}
Expand Down Expand Up @@ -152,7 +153,6 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
f"can't set {OperationState.WORKING.name} on {dataset.name}: {str(e)!r} "
)
context["sync"] = sync
context["auditing"]["attributes"] = audit_attributes
if action == "update":
access = params.query.get("access")
owner = params.query.get("owner")
Expand All @@ -168,12 +168,12 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
)

if access:
audit_attributes["access"] = access
auditing.add_attribute("access", access)
context["access"] = access
else:
access = dataset.access
if owner:
audit_attributes["owner"] = owner
auditing.add_attribute("owner", owner)
context["owner"] = owner
else:
owner = dataset.owner_id
Expand Down Expand Up @@ -245,6 +245,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
"""
action = context["action"]
dataset = context["dataset"]
auditing: AuditContext = context["auditing"]
current_app.logger.info("POSTPROCESS {}: {}", dataset.name, es_json)
failures = 0
if action == "get":
Expand Down Expand Up @@ -283,7 +284,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
"version_conflicts": 0,
"failures": 0,
}
context["auditing"]["attributes"]["results"] = results
auditing.add_attribute("results", results)

if failures == 0:
if action == "update":
Expand Down Expand Up @@ -318,15 +319,19 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
# it's still set then our `sync` local is valid and we want to get
# it out of "WORKING" state.
sync = context.get("sync")
auditing = context["auditing"]
if sync:
state = OperationState.OK if not failures else OperationState.FAILED
if failures:
state = OperationState.FAILED
message = f"Unable to {action} some indexed documents"
else:
state = OperationState.OK
message = None
try:
sync.update(dataset=dataset, state=state)
sync.update(dataset=dataset, state=state, message=message)
del context["sync"]
except Exception as e:
auditing["attributes"] = {"message": str(e)}
auditing["status"] = AuditStatus.WARNING
auditing["reason"] = AuditReason.INTERNAL
auditing.set_error(str(e), reason=AuditReason.INTERNAL)
auditing.status = AuditStatus.WARNING
raise APIInternalError(
f"Unexpected sync error {dataset.name} {str(e)!r}"
) from e
Expand All @@ -336,19 +341,15 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
# experienced total failure. Either way, the operation can be retried
# if some documents failed to update.
if results["failures"] and results["failures"] == results["total"]:
auditing["status"] = AuditStatus.WARNING
auditing["reason"] = AuditReason.INTERNAL
auditing["attributes"][
"message"
] = f"Unable to {action} some indexed documents"
auditing.status = AuditStatus.WARNING
auditing.reason = AuditReason.INTERNAL
auditing.add_attribute(
"message", f"Unable to {action} some indexed documents"
)
raise APIInternalError(
f"Failed to {action} any of {results['total']} "
f"Elasticsearch documents: {es_json}"
)
elif sync:
sync.error(
dataset=dataset,
message=f"Unable to {action} some indexed documents",
)

# construct response object
return jsonify(results)
4 changes: 2 additions & 2 deletions lib/pbench/server/api/resources/server_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _put_key(self, params: ApiParams, context: ApiContext) -> Response:
f"No value found for server settings key {key!r}",
)

context["auditing"]["attributes"] = {"updated": {key: value}}
context["auditing"].add_attribute("updated", {key: value})

try:
ServerSetting.set(key=key, value=value)
Expand Down Expand Up @@ -186,7 +186,7 @@ def _put_body(self, params: ApiParams, context: ApiContext) -> Response:
f"Unrecognized server settings {sorted(badkeys)!r} specified: valid settings are {sorted(ServerSetting.KEYS)!r}",
)

context["auditing"]["attributes"] = {"updated": params.body}
context["auditing"].add_attribute("updated", params.body)

failures = []
response = {}
Expand Down
3 changes: 2 additions & 1 deletion lib/pbench/test/unit/server/query_apis/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def query_api(
The Pbench API response object
"""
base_uri = server_config.get("Indexing", "uri")
es_url = f"{base_uri}{expected_index}{es_uri}"
idx = expected_index if expected_index is not None else "/"
es_url = f"{base_uri}{idx}{es_uri}"
client_method = getattr(client, request_method.name.lower())
if request_method == ApiMethod.GET:
es_method = responses.GET
Expand Down
42 changes: 29 additions & 13 deletions lib/pbench/test/unit/server/query_apis/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ def _setup(self, client):
)

@pytest.mark.parametrize(
"user,ao,expected_status",
"user,ao,idx,expected_status",
[
("drb", False, HTTPStatus.OK),
("drb", True, HTTPStatus.OK),
("test_admin", False, HTTPStatus.OK),
("test", False, HTTPStatus.FORBIDDEN),
(None, False, HTTPStatus.UNAUTHORIZED),
("drb", False, True, HTTPStatus.OK),
("drb", False, False, HTTPStatus.NOT_FOUND),
("drb", True, True, HTTPStatus.OK),
("drb", True, False, HTTPStatus.OK),
("test_admin", False, True, HTTPStatus.OK),
("test_admin", False, False, HTTPStatus.NOT_FOUND),
("test", False, False, HTTPStatus.FORBIDDEN),
(None, False, False, HTTPStatus.UNAUTHORIZED),
],
)
def test_empty_delete(
Expand All @@ -73,6 +76,7 @@ def test_empty_delete(
query_api,
user,
ao,
idx,
expected_status,
get_token_func,
):
Expand All @@ -88,13 +92,18 @@ def test_empty_delete(
else:
user_id = None

drb = Dataset.query(name="drb")
index = None
if ao:
# Set archiveonly flag to disable index-map logic
drb = Dataset.query(name="drb")
Metadata.setvalue(drb, Metadata.SERVER_ARCHIVE, True)
index = None
else:

# If we want an index, build the expected path; otherwise make sure
# the dataset doesn't have one.
if idx:
index = self.build_index_from_metadata()
else:
IndexMap.delete(drb)

response = query_api(
self.pbench_endpoint,
Expand Down Expand Up @@ -148,10 +157,17 @@ def test_empty_delete(
Dataset.query(name="drb")
else:
# On failure, the dataset should still exist
assert Dataset.query(name="drb")
assert response.json["message"].endswith(
"is not authorized to DELETE a resource owned by drb with private access"
)
drb = Dataset.query(name="drb")
ops = Metadata.getvalue(drb, "dataset.operations")
if expected_status == HTTPStatus.NOT_FOUND:
assert "FAILED" == ops["DELETE"]["state"]
else:
assert response.json["message"].endswith(
"is not authorized to DELETE a resource owned by drb with private access"
)

# permission errors should be caught before auditing
assert len(Audit.query()) == 0

@pytest.mark.parametrize(
"user,ao,owner,access,expected_status",
Expand Down
Loading