Skip to content

Commit

Permalink
✨(project) add multitenancy for user-specific index/database
Browse files Browse the repository at this point in the history
This adds multi-tenancy support: we can now specify a target index/database at
user creation, allowing for data storage separation. This is optional, if no
target is given at user creation, default target (specified in .env) will be
used.
  • Loading branch information
wilbrdt committed Feb 12, 2024
1 parent 683aeac commit 75fbeea
Show file tree
Hide file tree
Showing 31 changed files with 862 additions and 113 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@ and this project adheres to

## [Unreleased]

### Added

- Add LRS multitenancy support for user-specific target storage

### Changed

- `query_statements` and `query_statements_by_ids` methods can now take an
optional user-specific target

### Fixed

- Backends: switch LRSStatementsQuery since/until field types to iso 8601 string

### Removed

- Removed `event_table_name` attribute of the ClickHouse data backend

## [4.0.0] - 2024-01-23

### Added
Expand Down
6 changes: 5 additions & 1 deletion src/ralph/api/auth/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ def get_basic_auth_user(
headers={"WWW-Authenticate": "Basic"},
)

user = AuthenticatedUser(scopes=user.scopes, agent=dict(user.agent))
user = AuthenticatedUser(
scopes=user.scopes,
agent=dict(user.agent),
target=user.target,
)

return user
4 changes: 3 additions & 1 deletion src/ralph/api/auth/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class IDToken(BaseModel):
accepted for processing.
iat (int): Time at which the JWT was issued.
scope (str): Scope(s) for resource authorization.
target (str): Target for storing the statements.
"""

iss: str
Expand All @@ -48,6 +49,7 @@ class IDToken(BaseModel):
exp: int
iat: int
scope: Optional[str]
target: Optional[str]

class Config: # noqa: D106
extra = Extra.ignore
Expand Down Expand Up @@ -147,6 +149,6 @@ def get_oidc_user(
user = AuthenticatedUser(
agent={"openid": f"{id_token.iss}/{id_token.sub}"},
scopes=UserScopes(id_token.scope.split(" ") if id_token.scope else []),
target=id_token.target,
)

return user
4 changes: 3 additions & 1 deletion src/ralph/api/auth/user.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Authenticated user for the Ralph API."""

from typing import Dict, FrozenSet, Literal
from typing import Dict, FrozenSet, Literal, Optional

from pydantic import BaseModel

Expand Down Expand Up @@ -67,7 +67,9 @@ class AuthenticatedUser(BaseModel):
Attributes:
agent (dict): The agent representing the current user.
scopes (list): The scopes the user has access to.
target (str or None): The target index or database to store statements into.
"""

agent: Dict
scopes: UserScopes
target: Optional[str]
32 changes: 25 additions & 7 deletions src/ralph/api/routers/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ async def get( # noqa: PLR0913
try:
query_result = await await_if_coroutine(
BACKEND_CLIENT.query_statements(
RalphStatementsQuery.construct(**{**query_params, "limit": limit})
params=RalphStatementsQuery.construct(
**{**query_params, "limit": limit}
),
target=current_user.target,
)
)
except BackendException as error:
Expand Down Expand Up @@ -440,11 +443,16 @@ async def put(
try:
if isinstance(BACKEND_CLIENT, BaseLRSBackend):
existing_statements = list(
BACKEND_CLIENT.query_statements_by_ids([statement_id])
BACKEND_CLIENT.query_statements_by_ids(
ids=[statement_id], target=current_user.target
)
)
else:
existing_statements = [
x async for x in BACKEND_CLIENT.query_statements_by_ids([statement_id])
x
async for x in BACKEND_CLIENT.query_statements_by_ids(
ids=[statement_id], target=current_user.target
)
]
except BackendException as error:
raise HTTPException(
Expand All @@ -467,7 +475,11 @@ async def put(
# For valid requests, perform the bulk indexing of all incoming statements
try:
success_count = await await_if_coroutine(
BACKEND_CLIENT.write(data=[statement_as_dict], ignore_errors=False)
BACKEND_CLIENT.write(
data=[statement_as_dict],
target=current_user.target,
ignore_errors=False,
)
)
except (BackendException, BadFormatException) as exc:
logger.error("Failed to index submitted statement")
Expand Down Expand Up @@ -526,13 +538,15 @@ async def post( # noqa: PLR0912
try:
if isinstance(BACKEND_CLIENT, BaseLRSBackend):
existing_statements = list(
BACKEND_CLIENT.query_statements_by_ids(list(statements_dict))
BACKEND_CLIENT.query_statements_by_ids(
ids=list(statements_dict), target=current_user.target
)
)
else:
existing_statements = [
x
async for x in BACKEND_CLIENT.query_statements_by_ids(
list(statements_dict)
ids=list(statements_dict), target=current_user.target
)
]
except BackendException as error:
Expand Down Expand Up @@ -573,7 +587,11 @@ async def post( # noqa: PLR0912
# For valid requests, perform the bulk indexing of all incoming statements
try:
success_count = await await_if_coroutine(
BACKEND_CLIENT.write(data=statements_dict.values(), ignore_errors=False)
BACKEND_CLIENT.write(
data=statements_dict.values(),
target=current_user.target,
ignore_errors=False,
)
)
except (BackendException, BadFormatException) as exc:
logger.error("Failed to index submitted statements")
Expand Down
6 changes: 2 additions & 4 deletions src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ def __init__(self, settings: Optional[Settings] = None):
"""
super().__init__(settings)
self.database = self.settings.DATABASE
self.event_table_name = self.settings.EVENT_TABLE_NAME
self._client = None

@property
Expand Down Expand Up @@ -255,8 +254,7 @@ def _read_dicts(
ignore_errors: bool,
) -> Iterator[dict]:
"""Method called by `self.read` yielding dictionaries. See `self.read`."""
if target is None:
target = self.event_table_name
target = target if target else self.settings.EVENT_TABLE_NAME

if isinstance(query.select, str):
query.select = [query.select]
Expand Down Expand Up @@ -341,7 +339,7 @@ def _write_dicts( # noqa: PLR0913
) -> int:
"""Method called by `self.write` writing dictionaries. See `self.write`."""
count = 0
target = target if target else self.event_table_name
target = target if target else self.settings.EVENT_TABLE_NAME
msg = "Start writing to the %s table of the %s database (chunk size: %d)"
logger.debug(msg, target, self.database, chunk_size)
insert_tuples = self._to_insert_tuples(data, ignore_errors)
Expand Down
14 changes: 9 additions & 5 deletions src/ralph/backends/lrs/async_es.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Asynchronous Elasticsearch LRS backend for Ralph."""

import logging
from typing import AsyncIterator, List
from typing import AsyncIterator, List, Optional

from ralph.backends.data.async_es import AsyncESDataBackend
from ralph.backends.lrs.base import (
Expand All @@ -19,14 +19,16 @@ class AsyncESLRSBackend(BaseAsyncLRSBackend[ESLRSBackendSettings], AsyncESDataBa
"""Asynchronous Elasticsearch LRS backend implementation."""

async def query_statements(
self, params: RalphStatementsQuery
self, params: RalphStatementsQuery, target: Optional[str] = None
) -> StatementQueryResult:
"""Return the statements query payload using xAPI parameters."""
query = ESLRSBackend.get_query(params=params)
try:
statements = [
document["_source"]
async for document in self.read(query=query, chunk_size=params.limit)
async for document in self.read(
query=query, target=target, chunk_size=params.limit
)
]
except (BackendException, BackendParameterException) as error:
logger.error("Failed to read from Elasticsearch")
Expand All @@ -38,11 +40,13 @@ async def query_statements(
search_after="|".join(query.search_after) if query.search_after else "",
)

async def query_statements_by_ids(self, ids: List[str]) -> AsyncIterator[dict]:
async def query_statements_by_ids(
self, ids: List[str], target: Optional[str] = None
) -> AsyncIterator[dict]:
"""Yield statements with matching ids from the backend."""
query = self.query_class(query={"terms": {"_id": ids}})
try:
async for document in self.read(query=query):
async for document in self.read(query=query, target=target):
yield document["_source"]
except (BackendException, BackendParameterException) as error:
logger.error("Failed to read from Elasticsearch")
Expand Down
14 changes: 9 additions & 5 deletions src/ralph/backends/lrs/async_mongo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Async MongoDB LRS backend for Ralph."""

import logging
from typing import AsyncIterator, List
from typing import AsyncIterator, List, Optional

from ralph.backends.data.async_mongo import AsyncMongoDataBackend
from ralph.backends.lrs.base import (
Expand All @@ -21,14 +21,16 @@ class AsyncMongoLRSBackend(
"""Async MongoDB LRS backend implementation."""

async def query_statements(
self, params: RalphStatementsQuery
self, params: RalphStatementsQuery, target: Optional[str] = None
) -> StatementQueryResult:
"""Return the statements query payload using xAPI parameters."""
query = MongoLRSBackend.get_query(params)
try:
mongo_response = [
document
async for document in self.read(query=query, chunk_size=params.limit)
async for document in self.read(
query=query, target=target, chunk_size=params.limit
)
]
except (BackendException, BackendParameterException) as error:
logger.error("Failed to read from async MongoDB")
Expand All @@ -44,11 +46,13 @@ async def query_statements(
search_after=search_after,
)

async def query_statements_by_ids(self, ids: List[str]) -> AsyncIterator[dict]:
async def query_statements_by_ids(
self, ids: List[str], target: Optional[str] = None
) -> AsyncIterator[dict]:
"""Yield statements with matching ids from the backend."""
query = self.query_class(filter={"_source.id": {"$in": ids}})
try:
async for document in self.read(query=query):
async for document in self.read(query=query, target=target):
yield document["_source"]
except (BackendException, BackendParameterException) as error:
logger.error("Failed to read from MongoDB")
Expand Down
14 changes: 10 additions & 4 deletions src/ralph/backends/lrs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,15 @@ class BaseLRSBackend(BaseDataBackend[Settings, Any]):
"""Base LRS backend interface."""

@abstractmethod
def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult:
def query_statements(
self, params: RalphStatementsQuery, target: Optional[str] = None
) -> StatementQueryResult:
"""Return the statements query payload using xAPI parameters."""

@abstractmethod
def query_statements_by_ids(self, ids: List[str]) -> Iterator[dict]:
def query_statements_by_ids(
self, ids: List[str], target: Optional[str] = None
) -> Iterator[dict]:
"""Yield statements with matching ids from the backend."""


Expand All @@ -133,10 +137,12 @@ class BaseAsyncLRSBackend(BaseAsyncDataBackend[Settings, Any]):

@abstractmethod
async def query_statements(
self, params: RalphStatementsQuery
self, params: RalphStatementsQuery, target: Optional[str] = None
) -> StatementQueryResult:
"""Return the statements query payload using xAPI parameters."""

@abstractmethod
async def query_statements_by_ids(self, ids: List[str]) -> AsyncIterator[dict]:
async def query_statements_by_ids(
self, ids: List[str], target: Optional[str] = None
) -> AsyncIterator[dict]:
"""Return the list of matching statement IDs from the database."""
14 changes: 9 additions & 5 deletions src/ralph/backends/lrs/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""ClickHouse LRS backend for Ralph."""

import logging
from typing import Generator, Iterator, List
from typing import Generator, Iterator, List, Optional

from ralph.backends.data.clickhouse import (
ClickHouseDataBackend,
Expand Down Expand Up @@ -42,7 +42,9 @@ class ClickHouseLRSBackend(
):
"""ClickHouse LRS backend implementation."""

def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult:
def query_statements(
self, params: RalphStatementsQuery, target: Optional[str] = None
) -> StatementQueryResult:
"""Return the statements query payload using xAPI parameters."""
ch_params = params.dict(exclude_none=True)
where = []
Expand Down Expand Up @@ -99,7 +101,7 @@ def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult
clickhouse_response = list(
self.read(
query=query,
target=self.event_table_name,
target=target,
ignore_errors=True,
)
)
Expand All @@ -123,7 +125,9 @@ def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult
pit_id=new_pit_id,
)

def query_statements_by_ids(self, ids: List[str]) -> Iterator[dict]:
def query_statements_by_ids(
self, ids: List[str], target: Optional[str] = None
) -> Iterator[dict]:
"""Yield statements with matching ids from the backend."""

def chunk_id_list(chunk_size: int = self.settings.IDS_CHUNK_SIZE) -> Generator:
Expand All @@ -141,7 +145,7 @@ def chunk_id_list(chunk_size: int = self.settings.IDS_CHUNK_SIZE) -> Generator:
query.parameters["ids"] = chunk_ids
ch_response = self.read(
query=query,
target=self.event_table_name,
target=target,
ignore_errors=True,
)
yield from (document["event"] for document in ch_response)
Expand Down
16 changes: 11 additions & 5 deletions src/ralph/backends/lrs/es.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Elasticsearch LRS backend for Ralph."""

import logging
from typing import Iterator, List
from typing import Iterator, List, Optional

from ralph.backends.data.es import (
ESDataBackend,
Expand Down Expand Up @@ -34,11 +34,15 @@ class Config(BaseSettingsConfig):
class ESLRSBackend(BaseLRSBackend[ESLRSBackendSettings], ESDataBackend):
"""Elasticsearch LRS backend implementation."""

def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult:
def query_statements(
self, params: RalphStatementsQuery, target: Optional[str] = None
) -> StatementQueryResult:
"""Return the statements query payload using xAPI parameters."""
query = self.get_query(params=params)
try:
es_documents = self.read(query=query, chunk_size=params.limit)
es_documents = self.read(
query=query, target=target, chunk_size=params.limit
)
statements = [document["_source"] for document in es_documents]
except (BackendException, BackendParameterException) as error:
logger.error("Failed to read from Elasticsearch")
Expand All @@ -50,11 +54,13 @@ def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult
search_after="|".join(query.search_after) if query.search_after else "",
)

def query_statements_by_ids(self, ids: List[str]) -> Iterator[dict]:
def query_statements_by_ids(
self, ids: List[str], target: Optional[str] = None
) -> Iterator[dict]:
"""Yield statements with matching ids from the backend."""
query = self.query_class(query={"terms": {"_id": ids}})
try:
es_response = self.read(query=query)
es_response = self.read(query=query, target=target)
yield from (document["_source"] for document in es_response)
except (BackendException, BackendParameterException) as error:
logger.error("Failed to read from Elasticsearch")
Expand Down
Loading

0 comments on commit 75fbeea

Please sign in to comment.