Skip to content

Commit

Permalink
[core][feat] Add history timeline (#2152)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Jul 26, 2024
1 parent 0c4ee17 commit 4f24d5b
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 11 deletions.
17 changes: 17 additions & 0 deletions fixcore/fixcore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ def history_query(db: Any, query_model: QueryModel) -> Tuple[str, Json]:
return f"""{query_str} FOR result in {cursor}{last_limit} RETURN UNSET(result, {unset_props})""", ctx.bind_vars


def history_query_histogram(db: Any, query_model: QueryModel, granularity: timedelta) -> Tuple[str, Json]:
ctx = ArangoQueryContext()
query = rewrite_query(query_model)
in_cursor, query_str = query_string(
db, query, query_model, f"`{db.name}_node_history`", False, ctx, id_column="id", use_fulltext_index=False
)
crs = ctx.next_crs()
slot = ctx.add_bind_var(granularity.total_seconds() * 1000)
query_str += (
f" FOR {crs} IN {in_cursor} "
f"COLLECT change={crs}.change, at=DATE_ISO8601(FLOOR(DATE_TIMESTAMP({crs}.changed_at) / @{slot}) * @{slot}) "
f"WITH COUNT INTO v SORT at ASC "
'RETURN {"at": at, "group": {"change": change}, "v": v}'
)
return query_str, ctx.bind_vars


def query_view_string(
db: Any,
query: Query,
Expand Down
8 changes: 8 additions & 0 deletions fixcore/fixcore/db/async_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ async def aql_cursor(
stream: Optional[bool] = None,
skip_inaccessible_cols: Optional[bool] = None,
max_runtime: Optional[Number] = None,
fill_block_cache: Optional[bool] = None,
allow_dirty_read: bool = False,
allow_retry: bool = False,
force_one_shard_attribute_value: Optional[str] = None,
) -> AsyncCursorContext:
cursor: Cursor = await run_async(
self.db.aql.execute, # type: ignore
Expand All @@ -224,6 +228,10 @@ async def aql_cursor(
stream,
skip_inaccessible_cols,
max_runtime,
fill_block_cache,
allow_dirty_read,
allow_retry,
force_one_shard_attribute_value,
)
return AsyncCursorContext(
AsyncCursor(
Expand Down
74 changes: 64 additions & 10 deletions fixcore/fixcore/db/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,19 @@ async def search_history(
) -> AsyncCursorContext:
pass

@abstractmethod
async def history_timeline(
self,
query: QueryModel,
before: datetime,
after: datetime,
granularity: Optional[timedelta] = None,
changes: Optional[List[HistoryChange]] = None,
timeout: Optional[timedelta] = None,
**kwargs: Any,
) -> AsyncCursorContext:
pass

@abstractmethod
async def list_possible_values(
self,
Expand Down Expand Up @@ -731,21 +744,21 @@ async def search_list(
ttl=cast(Number, int(timeout.total_seconds())) if timeout else None,
)

async def search_history(
def _history_query_model(
self,
query: QueryModel,
changes: Optional[List[HistoryChange]] = None,
before: Optional[datetime] = None,
after: Optional[datetime] = None,
with_count: bool = False,
timeout: Optional[timedelta] = None,
**kwargs: Any,
) -> AsyncCursorContext:
) -> QueryModel:
more_than_one = len(query.query.parts) > 1
has_invalid_terms = any(query.query.find_terms(lambda t: isinstance(t, (FulltextTerm, MergeTerm))))
has_navigation = any(p.navigation for p in query.query.parts)
if more_than_one or has_invalid_terms or has_navigation:
raise AttributeError("Fulltext, merge terms and navigation is not supported in history queries!")
if before and after and before < after:
raise AttributeError("Before marks the end and should be greater than after!")

# adjust query
term = query.query.current_part.term
if changes:
Expand All @@ -754,16 +767,25 @@ async def search_history(
term = term.and_term(P.single("changed_at").gt(utc_str(after)))
if before:
term = term.and_term(P.single("changed_at").lt(utc_str(before)))
query = QueryModel(evolve(query.query, parts=[evolve(query.query.current_part, term=term)]), query.model)
return QueryModel(evolve(query.query, parts=[evolve(query.query.current_part, term=term)]), query.model)

async def search_history(
self,
query: QueryModel,
changes: Optional[List[HistoryChange]] = None,
before: Optional[datetime] = None,
after: Optional[datetime] = None,
with_count: bool = False,
timeout: Optional[timedelta] = None,
**kwargs: Any,
) -> AsyncCursorContext:
query = self._history_query_model(query, changes, before, after)
q_string, bind = arango_query.history_query(self, query)
trafo = (
None
if query.query.aggregate
else self.document_to_instance_fn(
query.model,
query,
["change", "changed_at", "before", "diff"],
id_column="id",
query.model, query, ["change", "changed_at", "before", "diff"], id_column="id"
)
)
ttl = cast(Number, int(timeout.total_seconds())) if timeout else None
Expand All @@ -777,6 +799,26 @@ async def search_history(
ttl=ttl,
)

async def history_timeline(
self,
query: QueryModel,
before: datetime,
after: datetime,
granularity: Optional[timedelta] = None,
changes: Optional[List[HistoryChange]] = None,
timeout: Optional[timedelta] = None,
**kwargs: Any,
) -> AsyncCursorContext:
# ignore aggregates functions for timelines
if query.query.aggregate is not None:
query = evolve(query, query=evolve(query.query, aggregate=None))
# in case no granularity is provided we will compute one: 1/25 of the time range but at least one hour
gran = max(granularity or abs(before - after) / 25, timedelta(hours=1))
query = self._history_query_model(query, changes, before, after)
q_string, bind = arango_query.history_query_histogram(self, query, gran)
ttl = cast(Number, int(timeout.total_seconds())) if timeout else None
return await self.db.aql_cursor(query=q_string, bind_vars=bind, batch_size=10000, ttl=ttl)

async def search_graph_gen(
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any
) -> AsyncCursorContext:
Expand Down Expand Up @@ -1849,6 +1891,18 @@ async def search_history(
await self.event_sender.core_event(CoreEvent.HistoryQuery, context, **counters)
return await self.real.search_history(query, changes, before, after, with_count, timeout, **kwargs)

async def history_timeline(
self,
query: QueryModel,
before: datetime,
after: datetime,
granularity: Optional[timedelta] = None,
changes: Optional[List[HistoryChange]] = None,
timeout: Optional[timedelta] = None,
**kwargs: Any,
) -> AsyncCursorContext:
return await self.real.history_timeline(query, before, after, granularity, changes, timeout, **kwargs)

async def search_graph_gen(
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any
) -> AsyncCursorContext:
Expand Down
61 changes: 61 additions & 0 deletions fixcore/fixcore/static/api-doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,67 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/EstimatedSearchCost"
/graph/{graph_id}/search/history/timeline:
post:
summary: "Search history events and provide the number of events over time."
description: |
Search all history events and a histogram over time.
A section can be defined (defaults to `/` == root) to interpret relative property paths.
Example: is(volume) and (reported.age>23d or desired.clean==true or metadata.version==2)
tags:
- graph_search
parameters:
- $ref: "#/components/parameters/graph_id"
- $ref: "#/components/parameters/section"
- name: before
in: query
description: "Count all history events before the given timestamp"
required: true
schema:
type: string
format: date-time
- name: after
in: query
description: "Count all history events after the given timestamp"
required: true
schema:
type: string
format: date-time
- name: granularity
in: query
description: "Optional parameter to define the granularity of the timeline"
required: false
schema:
type: string
format: duration
- name: change
in: query
description: "Optional parameter to get all history events with the given change type"
required: false
schema:
type: string
enum:
- node_created
- node_updated
- node_deleted
requestBody:
description: "The search to perform"
content:
text/plain:
schema:
type: string
example: is(volume) and reported.volume_size>100
responses:
"200":
description: "The result of this search in the defined format"
content:
application/json:
example: |
[
{ "at": "2024-07-14T00:00:00.000Z", "group": { "change": "node_created" }, "v": 170 },
{ "at": "2024-07-15T00:00:00.000Z", "group": { "change": "node_updated" }, "v": 833 },
{ "at": "2024-07-15T00:00:00.000Z", "group": { "change": "node_created" }, "v": 1166 }
]
/graph/{graph_id}/search/history/list:
post:
summary: "Search all history events and return them."
Expand Down
19 changes: 19 additions & 0 deletions fixcore/fixcore/web/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
WorkerTaskInProgress,
)
from fixlib.asynchronous.web.ws_handler import accept_websocket, clean_ws_handler
from fixlib.durations import parse_duration
from fixlib.jwt import encode_jwt
from fixlib.x509 import cert_to_bytes

Expand Down Expand Up @@ -246,6 +247,7 @@ def __add_routes(self, prefix: str) -> None:
web.post(prefix + "/graph/{graph_id}/search/aggregate", require(self.query_aggregation, r)),
web.post(prefix + "/graph/{graph_id}/search/history/list", require(self.query_history, r)),
web.post(prefix + "/graph/{graph_id}/search/history/aggregate", require(self.query_history, r)),
web.post(prefix + "/graph/{graph_id}/search/history/timeline", require(self.history_timeline, r)),
web.post(prefix + "/graph/{graph_id}/property/attributes", require(self.possible_values, r)),
web.post(prefix + "/graph/{graph_id}/property/values", require(self.possible_values, r)),
web.post(prefix + "/graph/{graph_id}/property/path/complete", require(self.property_path_complete, r)),
Expand Down Expand Up @@ -1238,6 +1240,23 @@ async def query_aggregation(self, request: Request, deps: TenantDependencies) ->
request, cursor, count=cursor.count(), total_count=cursor.full_count(), query_stats=cursor.stats()
)

async def history_timeline(self, request: Request, deps: TenantDependencies) -> StreamResponse:
graph_db, query_model = await self.graph_query_model_from_request(request, deps)
before = request.query.getone("before")
after = request.query.getone("after")
granularity = request.query.get("granularity")
changes = if_set(request.query.get("change"), lambda x: x.split(","))
async with await graph_db.history_timeline(
query=query_model,
before=parse_utc(before),
after=parse_utc(after),
granularity=parse_duration(granularity) if granularity else None,
changes=[HistoryChange[change] for change in changes] if changes else None,
) as cursor:
return await self.stream_response_from_gen(
request, cursor, count=cursor.count(), total_count=cursor.full_count(), query_stats=cursor.stats()
)

async def query_history(self, request: Request, deps: TenantDependencies) -> StreamResponse:
graph_db, query_model = await self.graph_query_model_from_request(request, deps)
before = request.query.get("before")
Expand Down
18 changes: 17 additions & 1 deletion fixcore/tests/fixcore/db/graphdb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from pytest import mark, raises

from fixcore.analytics import CoreEvent, InMemoryEventSender
from fixcore.db.db_access import DbAccess
from fixcore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB, HistoryChange
from fixcore.db.model import QueryModel, GraphUpdate
from fixcore.db.db_access import DbAccess
from fixcore.error import ConflictingChangeInProgress, NoSuchChangeError, InvalidBatchUpdate
from fixcore.ids import NodeId, GraphName
from fixcore.model.graph_access import GraphAccess, EdgeTypes, Section
Expand Down Expand Up @@ -434,6 +434,22 @@ async def nodes(query: Query, **args: Any) -> List[Json]:
assert len(await nodes(Query.by("foo"), after=five_min_ago, changes=[HistoryChange.node_deleted])) == 0


@mark.asyncio
async def test_query_history_timeline(filled_graph_db: ArangoGraphDB, foo_model: Model) -> None:
async def nodes(query: Query, **args: Any) -> List[Json]:
async with await filled_graph_db.history_timeline(QueryModel(query, foo_model), **args) as crsr:
return [x async for x in crsr]

now_plus_60 = utc() + timedelta(minutes=60)
now_min_60 = now_plus_60 - timedelta(minutes=120)
slices = await nodes(Query.by("foo"), after=now_min_60, before=now_plus_60)
assert len(slices) == 1
assert slices[0]["v"] == 10
slices = await nodes(Query.by("bla"), after=now_min_60, before=now_plus_60)
assert len(slices) == 1
assert slices[0]["v"] == 100


@mark.asyncio
async def test_query_graph(filled_graph_db: ArangoGraphDB, foo_model: Model) -> None:
graph = await load_graph(filled_graph_db, foo_model)
Expand Down

0 comments on commit 4f24d5b

Please sign in to comment.