Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(api) Enrich incoming statements #410

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ have an authority field matching that of the user
- Backends: `LRSHTTP` methods must not be used in `asyncio` events loop (BC)
- Add variable to override PVC name in arnold deployment
- Backends: add `max_statements` option to `AsyncLRSHTTP`
- API: Incoming statements are enriched with `id`, `timestamp`, `stored`
and `authority`

## [3.9.0] - 2023-07-21

Expand Down
80 changes: 57 additions & 23 deletions src/ralph/api/routers/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ralph.api.auth import get_authenticated_user
from ralph.api.auth.user import AuthenticatedUser
from ralph.api.forwarding import forward_xapi_statements, get_active_xapi_forwardings
from ralph.api.models import ErrorDetail, LaxStatement
from ralph.backends.database.base import (
AgentParameters,
BaseDatabase,
Expand All @@ -40,8 +41,7 @@
BaseXapiAgentWithOpenId,
)
from ralph.models.xapi.base.common import IRI

from ..models import ErrorDetail, LaxStatement
from ralph.utils import now, statements_are_equivalent

logger = logging.getLogger(__name__)

Expand All @@ -67,6 +67,31 @@
}


def _enrich_statement_with_id(statement: dict):
# id: Statement UUID identifier.
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#24-statement-properties
statement["id"] = str(statement.get("id", uuid4()))
return statement["id"]


def _enrich_statement_with_stored(statement: dict):
# stored: The time at which a Statement is stored by the LRS.
# https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Data.md#248-stored
statement["stored"] = now()


def _enrich_statement_with_timestamp(statement: dict):
# timestamp: Time of the action. If not provided, it take the same value as stored.
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#247-timestamp
statement["timestamp"] = statement.get("timestamp", statement["stored"])


def _enrich_statement_with_authority(statement: dict, current_user: AuthenticatedUser):
# authority: Information about whom or what has asserted the statement is true.
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#249-authority
statement["authority"] = current_user.agent


def _parse_agent_parameters(agent_obj: dict):
"""Parse a dict and return an AgentParameters object to use in queries."""
# Transform agent to `dict` as FastAPI cannot parse JSON (seen as string)
Expand Down Expand Up @@ -356,8 +381,9 @@ async def get(
@router.put("", responses=POST_PUT_RESPONSES, status_code=status.HTTP_204_NO_CONTENT)
# pylint: disable=unused-argument
async def put(
current_user: Annotated[AuthenticatedUser, Depends(get_authenticated_user)],
# pylint: disable=invalid-name
statementId: str,
statementId: UUID,
statement: LaxStatement,
background_tasks: BackgroundTasks,
_=Depends(strict_query_params),
Expand All @@ -367,23 +393,28 @@ async def put(
LRS Specification:
https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Communication.md#211-put-statements
"""
statement_dict = {statementId: statement.dict(exclude_unset=True)}

# Force the UUID id in the statement to string, make sure it matches the
# statementId given in the URL.
statement_dict[statementId]["id"] = str(statement_dict[statementId]["id"])
statement_as_dict = statement.dict(exclude_unset=True)
statementId = str(statementId)

if not statementId == statement_dict[statementId]["id"]:
statement_as_dict.update(id=str(statement_as_dict.get("id", statementId)))
if statementId != statement_as_dict["id"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="xAPI statement id does not match given statementId",
)

# Enrich statement before forwarding (NB: id is already set)
_enrich_statement_with_stored(statement_as_dict)
_enrich_statement_with_timestamp(statement_as_dict)

if get_active_xapi_forwardings():
background_tasks.add_task(
forward_xapi_statements, statement_dict[statementId], method="put"
forward_xapi_statements, statement_as_dict, method="put"
)

# Finish enriching statements after forwarding
_enrich_statement_with_authority(statement_as_dict, current_user)

try:
existing_statement = DATABASE_CLIENT.query_statements_by_ids([statementId])
except BackendException as error:
Expand All @@ -394,10 +425,10 @@ async def put(

if existing_statement:
# The LRS specification calls for deep comparison of duplicate statement ids.
# In the case that the current statement is not an exact duplicate of the one
# found in the database we return a 409, otherwise the usual 204.
# In the case that the current statement is not equivalent to one found
# in the database we return a 409, otherwise the usual 204.
for existing in existing_statement:
if statement_dict != existing["_source"]:
if not statements_are_equivalent(statement_as_dict, existing["_source"]):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A different statement already exists with the same ID",
Expand All @@ -406,9 +437,7 @@ async def put(

# For valid requests, perform the bulk indexing of all incoming statements
try:
success_count = DATABASE_CLIENT.put(
statement_dict.values(), ignore_errors=False
)
success_count = DATABASE_CLIENT.put([statement_as_dict], ignore_errors=False)
wilbrdt marked this conversation as resolved.
Show resolved Hide resolved
except (BackendException, BadFormatException) as exc:
logger.error("Failed to index submitted statement")
raise HTTPException(
Expand All @@ -422,6 +451,7 @@ async def put(
@router.post("/", responses=POST_PUT_RESPONSES)
@router.post("", responses=POST_PUT_RESPONSES)
async def post(
current_user: Annotated[AuthenticatedUser, Depends(get_authenticated_user)],
statements: Union[LaxStatement, List[LaxStatement]],
background_tasks: BackgroundTasks,
response: Response,
Expand All @@ -438,14 +468,12 @@ async def post(
if not isinstance(statements, list):
statements = [statements]

# The statements dict has multiple functions:
# - generate IDs for statements that are missing them;
# - use the list of keys to perform validations and as a final return value;
# - provide an iterable containing both the statements and generated IDs for bulk.
# Enrich statements before forwarding
statements_dict = {}
for statement in map(lambda x: x.dict(exclude_unset=True), statements):
statement_id = str(statement.get("id", uuid4()))
statement["id"] = statement_id
statement_id = _enrich_statement_with_id(statement)
_enrich_statement_with_stored(statement)
_enrich_statement_with_timestamp(statement)
statements_dict[statement_id] = statement

# Requests with duplicate statement IDs are considered invalid
Expand All @@ -462,6 +490,10 @@ async def post(
forward_xapi_statements, list(statements_dict.values()), method="post"
)

# Finish enriching statements after forwarding
for statement in statements_dict.values():
_enrich_statement_with_authority(statement, current_user)

try:
existing_statements = DATABASE_CLIENT.query_statements_by_ids(statements_ids)
except BackendException as error:
Expand All @@ -482,7 +514,9 @@ async def post(

# The LRS specification calls for deep comparison of duplicates. This
# is done here. If they are not exactly the same, we raise an error.
if statements_dict[existing["_id"]] != existing["_source"]:
if not statements_are_equivalent(
statements_dict[existing["_id"]], existing["_source"]
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Differing statements already exist with the same ID: "
Expand Down
22 changes: 22 additions & 0 deletions src/ralph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,25 @@ async def sem_task(task):
except Exception as exception:
group.cancel()
raise exception


def statements_are_equivalent(statement_1: dict, statement_2: dict):
"""Check if statements are equivalent.

To be equivalent, they must be identical on all fields not modified on input by the
LRS and identical on other fields, if these fields are present in both
statements. For example, if an "authority" field is present in only one statement,
they may still be equivalent.
"""
# Check that unmutable fields have the same values
fields = ["actor", "verb", "object", "id", "result", "context", "attachements"]

# Check that some fields enriched by the LRS are equal when in both statements
# The LRS specification excludes the fields below from equivalency. It was
# decided to include them anyway as their value is inherent to the statements.
other_fields = {"timestamp", "version"} # "authority" and "stored" remain ignored.
fields.extend(other_fields & statement_1.keys() & statement_2.keys())

if any(statement_1.get(field) != statement_2.get(field) for field in fields):
return False
return True
Loading