Skip to content

Commit

Permalink
✨(api) enrich statements on post and put
Browse files Browse the repository at this point in the history
The xAPI specification indicates to infer the fields `authority`, `stored`,
`timestamp` and `id` (discouraging use of `version`), when recieving statements.
This commit implements this requirement, thus paving the way to proper
permissions management (through `authority`).
  • Loading branch information
Leobouloc committed Aug 29, 2023
1 parent a834290 commit 336b5ba
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ have an authority field matching that of the user
- Backends: `LRSHTTP` methods must not be used in `asyncio` events loop (BC)
- Add variable to override PVC name in arnold deployment
- Backends: add `max_statements` option to `AsyncLRSHTTP`
- API: Incoming statements are enriched with `id`, `timestamp`, `stored`
and `authority`

## [3.9.0] - 2023-07-21

Expand Down
91 changes: 70 additions & 21 deletions src/ralph/api/routers/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ralph.api.auth import get_authenticated_user
from ralph.api.auth.user import AuthenticatedUser
from ralph.api.forwarding import forward_xapi_statements, get_active_xapi_forwardings
from ralph.api.models import ErrorDetail, LaxStatement
from ralph.backends.database.base import (
AgentParameters,
BaseDatabase,
Expand All @@ -40,8 +41,7 @@
BaseXapiAgentWithOpenId,
)
from ralph.models.xapi.base.common import IRI

from ..models import ErrorDetail, LaxStatement
from ralph.utils import now, statements_are_equivalent

logger = logging.getLogger(__name__)

Expand All @@ -67,6 +67,46 @@
}


def _enrich_statement_with_id(statement: dict):
# id: Statement UUID identifier
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#24-statement-properties
statement["id"] = str(statement.get("id", uuid4()))
return statement["id"]


def _enrich_statement_with_stored(statement: dict):
# stored: The time at which a statement is stored by the LRS
# https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Data.md#248-stored
statement["stored"] = now()
return statement["stored"]


def _enrich_statement_with_timestamp(statement: dict):
# timestamp: Time of the action. If not provided, it takes the same value as stored
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#247-timestamp
statement["timestamp"] = statement.get("timestamp", statement["stored"])
return statement["timestamp"]


def _enrich_statement_with_authority(statement: dict, current_user: AuthenticatedUser):
# authority: Information about whom or what has asserted that this statement is true
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#249-authority
authority = current_user.agent
if "authority" in statement and statement["authority"] != authority:
logger.error(
"Failed to index submitted statements. Submitted authority does not match."
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Stated `authority` does not match credentials. Change or remove"
"`authority` field from incoming statement.",
),
)
statement["authority"] = authority
return authority


def _parse_agent_parameters(agent_obj: dict):
"""Parse a dict and return an AgentParameters object to use in queries."""
# Transform agent to `dict` as FastAPI cannot parse JSON (seen as string)
Expand Down Expand Up @@ -356,8 +396,9 @@ async def get(
@router.put("", responses=POST_PUT_RESPONSES, status_code=status.HTTP_204_NO_CONTENT)
# pylint: disable=unused-argument
async def put(
current_user: Annotated[AuthenticatedUser, Depends(get_authenticated_user)],
# pylint: disable=invalid-name
statementId: str,
statementId: UUID,
statement: LaxStatement,
background_tasks: BackgroundTasks,
_=Depends(strict_query_params),
Expand All @@ -367,23 +408,28 @@ async def put(
LRS Specification:
https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Communication.md#211-put-statements
"""
statement_dict = {statementId: statement.dict(exclude_unset=True)}

# Force the UUID id in the statement to string, make sure it matches the
# statementId given in the URL.
statement_dict[statementId]["id"] = str(statement_dict[statementId]["id"])
statement_as_dict = statement.dict(exclude_unset=True)
statementId = str(statementId)

if not statementId == statement_dict[statementId]["id"]:
statement_as_dict.update(id=str(statement_as_dict.get("id", statementId)))
if statementId != statement_as_dict["id"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="xAPI statement id does not match given statementId",
)

# Enrich statement before forwarding (NB: id is already set)
_enrich_statement_with_stored(statement_as_dict)
_enrich_statement_with_timestamp(statement_as_dict)

if get_active_xapi_forwardings():
background_tasks.add_task(
forward_xapi_statements, statement_dict[statementId], method="put"
forward_xapi_statements, statement_as_dict, method="put"
)

# Finish enriching statements after forwarding
_enrich_statement_with_authority(statement_as_dict, current_user)

try:
existing_statement = DATABASE_CLIENT.query_statements_by_ids([statementId])
except BackendException as error:
Expand All @@ -397,7 +443,7 @@ async def put(
# In the case that the current statement is not an exact duplicate of the one
# found in the database we return a 409, otherwise the usual 204.
for existing in existing_statement:
if statement_dict != existing["_source"]:
if not statements_are_equivalent(statement_as_dict, existing["_source"]):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A different statement already exists with the same ID",
Expand All @@ -406,9 +452,7 @@ async def put(

# For valid requests, perform the bulk indexing of all incoming statements
try:
success_count = DATABASE_CLIENT.put(
statement_dict.values(), ignore_errors=False
)
success_count = DATABASE_CLIENT.put([statement_as_dict], ignore_errors=False)
except (BackendException, BadFormatException) as exc:
logger.error("Failed to index submitted statement")
raise HTTPException(
Expand All @@ -422,6 +466,7 @@ async def put(
@router.post("/", responses=POST_PUT_RESPONSES)
@router.post("", responses=POST_PUT_RESPONSES)
async def post(
current_user: Annotated[AuthenticatedUser, Depends(get_authenticated_user)],
statements: Union[LaxStatement, List[LaxStatement]],
background_tasks: BackgroundTasks,
response: Response,
Expand All @@ -438,14 +483,12 @@ async def post(
if not isinstance(statements, list):
statements = [statements]

# The statements dict has multiple functions:
# - generate IDs for statements that are missing them;
# - use the list of keys to perform validations and as a final return value;
# - provide an iterable containing both the statements and generated IDs for bulk.
# Enrich statements before forwarding
statements_dict = {}
for statement in map(lambda x: x.dict(exclude_unset=True), statements):
statement_id = str(statement.get("id", uuid4()))
statement["id"] = statement_id
statement_id = _enrich_statement_with_id(statement)
_enrich_statement_with_stored(statement)
_enrich_statement_with_timestamp(statement)
statements_dict[statement_id] = statement

# Requests with duplicate statement IDs are considered invalid
Expand All @@ -462,6 +505,10 @@ async def post(
forward_xapi_statements, list(statements_dict.values()), method="post"
)

# Finish enriching statements after forwarding
for statement in statements_dict.values():
_enrich_statement_with_authority(statement, current_user)

try:
existing_statements = DATABASE_CLIENT.query_statements_by_ids(statements_ids)
except BackendException as error:
Expand All @@ -482,7 +529,9 @@ async def post(

# The LRS specification calls for deep comparison of duplicates. This
# is done here. If they are not exactly the same, we raise an error.
if statements_dict[existing["_id"]] != existing["_source"]:
if not statements_are_equivalent(
statements_dict[existing["_id"]], existing["_source"]
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Differing statements already exist with the same ID: "
Expand Down
22 changes: 22 additions & 0 deletions src/ralph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,25 @@ async def sem_task(task):
except Exception as exception:
group.cancel()
raise exception


def statements_are_equivalent(statement_1: dict, statement_2: dict):
"""Check if statements are equivalent.
To be equivalent, they must be identical on all fields not modified on input by the
LRS and identical on other fields, if these fields are present in both
statements. For example, if an "authority" field is present in only one statement,
they may still be equivalent.
"""
# Check that unmutable fields have the same values
fields = ["actor", "verb", "object", "id", "result", "context", "attachements"]

# Check that some fields enriched by the LRS are equal when in both statements
# The LRS specification excludes the fields below from equivalency. It was
# decided to include them anyway as their value is inherent to the statements.
other_fields = {"timestamp", "version"} # "authority" and "stored" remain ignored.
fields.extend(other_fields & statement_1.keys() & statement_2.keys())

if any(statement_1.get(field) != statement_2.get(field) for field in fields):
return False
return True
Loading

0 comments on commit 336b5ba

Please sign in to comment.