From ca88c7cccb043c2ad462893f3558f80aa652b082 Mon Sep 17 00:00:00 2001 From: Sergey Ukustov Date: Thu, 28 Mar 2024 17:58:48 +0300 Subject: [PATCH] feat: FeedAggregationStore --- .../core/src/state-management/repository.ts | 4 ++++ .../core/src/store/feed-aggregation-store.ts | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 packages/core/src/store/feed-aggregation-store.ts diff --git a/packages/core/src/state-management/repository.ts b/packages/core/src/state-management/repository.ts index c338dbc5c9..998c3b4de7 100644 --- a/packages/core/src/state-management/repository.ts +++ b/packages/core/src/state-management/repository.ts @@ -42,6 +42,7 @@ import { IReconApi, ReconEventFeedResponse } from '../recon.js' import { Utils } from '../utils.js' import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' import { Model } from '@ceramicnetwork/stream-model' +import { FeedAggregationStore } from '../store/feed-aggregation-store.js' const DEFAULT_LOAD_OPTS = { sync: SyncOptions.PREFER_CACHE, syncTimeoutSeconds: 3 } const APPLY_ANCHOR_COMMIT_ATTEMPTS = 3 @@ -121,6 +122,7 @@ export class Repository { readonly inmemory: StateCache private readonly feed: Feed + private readonly feedAggregationStore: FeedAggregationStore private reconEventFeedSubscription: Subscription | undefined @@ -163,6 +165,7 @@ export class Repository { state$.complete() }) this.updates$ = this.updates$.bind(this) + this.feedAggregationStore = new FeedAggregationStore() } /** @@ -177,6 +180,7 @@ export class Repository { } async init(): Promise { + await this.feedAggregationStore.open(this.#deps.kvFactory) await this.pinStore.open(this.#deps.kvFactory) await this.anchorRequestStore.open(this.#deps.kvFactory) // Initialization hell await this.index.init() diff --git a/packages/core/src/store/feed-aggregation-store.ts b/packages/core/src/store/feed-aggregation-store.ts new file mode 100644 index 0000000000..6bc04a0e1d --- /dev/null +++ b/packages/core/src/store/feed-aggregation-store.ts @@ -0,0 +1,21 @@ +import { ObjectStore } from './object-store.js' +import { StreamID } from '@ceramicnetwork/streamid' + +function serializeStreamID(streamID: StreamID): string { + return streamID.toString() +} + +function deserializeStreamID(input: string): StreamID { + return StreamID.fromString(input) +} + +/** + * A storage for feed aggregation queue: key is a timestamp, value is StreamID. + */ +export class FeedAggregationStore extends ObjectStore { + protected useCaseName = 'feed-aggregation' + + constructor() { + super(String, serializeStreamID, deserializeStreamID) + } +}