Skip to content

Commit

Permalink
feat: FeedAggregationStore
Browse files Browse the repository at this point in the history
  • Loading branch information
ukstv committed Mar 28, 2024
1 parent 5533292 commit ca88c7c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
4 changes: 4 additions & 0 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { IReconApi, ReconEventFeedResponse } from '../recon.js'
import { Utils } from '../utils.js'
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'
import { Model } from '@ceramicnetwork/stream-model'
import { FeedAggregationStore } from '../store/feed-aggregation-store.js'

const DEFAULT_LOAD_OPTS = { sync: SyncOptions.PREFER_CACHE, syncTimeoutSeconds: 3 }
const APPLY_ANCHOR_COMMIT_ATTEMPTS = 3
Expand Down Expand Up @@ -121,6 +122,7 @@ export class Repository {
readonly inmemory: StateCache<RunningState>

private readonly feed: Feed
private readonly feedAggregationStore: FeedAggregationStore

private reconEventFeedSubscription: Subscription | undefined

Expand Down Expand Up @@ -163,6 +165,7 @@ export class Repository {
state$.complete()
})
this.updates$ = this.updates$.bind(this)
this.feedAggregationStore = new FeedAggregationStore()
}

/**
Expand All @@ -177,6 +180,7 @@ export class Repository {
}

async init(): Promise<void> {
await this.feedAggregationStore.open(this.#deps.kvFactory)
await this.pinStore.open(this.#deps.kvFactory)
await this.anchorRequestStore.open(this.#deps.kvFactory) // Initialization hell
await this.index.init()
Expand Down
21 changes: 21 additions & 0 deletions packages/core/src/store/feed-aggregation-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { ObjectStore } from './object-store.js'
import { StreamID } from '@ceramicnetwork/streamid'

function serializeStreamID(streamID: StreamID): string {
return streamID.toString()
}

function deserializeStreamID(input: string): StreamID {
return StreamID.fromString(input)
}

/**
* A storage for feed aggregation queue: key is a timestamp, value is StreamID.
*/
export class FeedAggregationStore extends ObjectStore<number, StreamID> {
protected useCaseName = 'feed-aggregation'

constructor() {
super(String, serializeStreamID, deserializeStreamID)
}
}

0 comments on commit ca88c7c

Please sign in to comment.