Skip to content

Commit

Permalink
fix: remove parsing of resume token and add unit tests for recon (#3179)
Browse files Browse the repository at this point in the history
* chore: remove parsing of resume token and add unit tests for recon

* chore: add comment about recon feed limit

* fix: test mocks
  • Loading branch information
stephhuynh18 authored Mar 12, 2024
1 parent c21043f commit e094c48
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 15 deletions.
160 changes: 160 additions & 0 deletions packages/core/src/__tests__/recon.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { ReconApi, ReconApiConfig } from '../recon.js'
import { EventID, StreamID } from '@ceramicnetwork/streamid'
import { FetchRequest, LoggerProvider } from '@ceramicnetwork/common'
import { jest } from '@jest/globals'
import { type CAR } from 'cartonne'
import { toArray, take, lastValueFrom, firstValueFrom, race, timer } from 'rxjs'

const RECON_URL = 'http://example.com'
const LOGGER = new LoggerProvider().getDiagnosticsLogger()
const MODEL = StreamID.fromString('kjzl6cwe1jw147ww5d8pswh1hjh686mut8v1br10dar8l9a3n1t8l15l0vrzn88')
const FAKE_EVENT = { id: EventID.createRandom('inmemory', 0), data: {} as CAR }

describe('ReconApi', () => {
let mockSendRequest: jest.Mock<FetchRequest>
let reconApi: ReconApi

beforeEach(async () => {
mockSendRequest = jest.fn((url: URL | string): Promise<any> => {
url = url.toString()
if (url.includes('/ceramic/feed/events')) {
return Promise.resolve({
events: [{ id: EventID.createRandom('inmemory', 0).toString(), data: undefined }],
resumeToken: 'test',
})
}
return Promise.resolve()
})

reconApi = new ReconApi(
{
enabled: true,
url: RECON_URL,
feedEnabled: true,
},
LOGGER,
mockSendRequest
)
await reconApi.init()
mockSendRequest.mockClear()
})

afterEach(async () => {
await reconApi.stop()
})

describe('init', () => {
test('should not init if recon is disabled', async () => {
const mockSendRequest = jest.fn(() => Promise.resolve())
const reconApi = new ReconApi(
{ enabled: false, url: RECON_URL, feedEnabled: true },
LOGGER,
mockSendRequest
)
await reconApi.init()
expect(mockSendRequest).not.toHaveBeenCalled()
})

test('should not init if already initialized', async () => {
// already initialized in beforeEach
await reconApi.init()

expect(mockSendRequest).not.toHaveBeenCalled()
})

test('should not start polling if feed is disabled', async () => {
const mockSendRequest = jest.fn(() => Promise.resolve())
const reconApi = new ReconApi(
{ enabled: true, url: RECON_URL, feedEnabled: false },
LOGGER,
mockSendRequest
)
await reconApi.init()
await firstValueFrom(race(reconApi, timer(1000)))
expect(mockSendRequest).toHaveBeenCalledTimes(1)
})
})

describe('registerInterest', () => {
test('should throw if recon is disabled', async () => {
const reconApi = new ReconApi(
{ enabled: false, url: RECON_URL, feedEnabled: true },
LOGGER,
mockSendRequest
)
await expect(reconApi.registerInterest(MODEL)).rejects.toThrow(
'Recon: disabled, not registering interest in model kjzl6cwe1jw147ww5d8pswh1hjh686mut8v1br10dar8l9a3n1t8l15l0vrzn88'
)
})

test('should be able to register interest in a model', async () => {
await reconApi.registerInterest(MODEL)
expect(mockSendRequest).toHaveBeenCalledWith(
`${RECON_URL}/ceramic/interests/model/${MODEL.toString()}`,
{ method: 'POST' }
)
})
})

describe('put', () => {
test('should do nothing if recon is disabled', async () => {
const mockSendRequest = jest.fn(() => Promise.resolve())
const reconApi = new ReconApi(
{ enabled: false, url: RECON_URL, feedEnabled: true },
LOGGER,
mockSendRequest
)
await expect(reconApi.put(FAKE_EVENT)).resolves
expect(mockSendRequest).not.toHaveBeenCalled()
})

test('put should put an event to the Recon API', async () => {
await reconApi.put(FAKE_EVENT, {})

expect(mockSendRequest).toHaveBeenCalledWith(`${RECON_URL}/ceramic/events`, {
method: 'POST',
body: { id: FAKE_EVENT.id.toString(), data: FAKE_EVENT.data.toString() },
})
})
})

describe('feed', () => {
test('should be able to subscribe to recon api as event feed', async () => {
let resumeToken = 0
mockSendRequest.mockImplementation(async () => {
resumeToken = resumeToken + 1
return {
events: [{ id: EventID.createRandom('inmemory', 0).toString(), data: undefined }],
resumeToken: resumeToken.toString(),
}
})
const received = await lastValueFrom(reconApi.pipe(take(3), toArray()))

expect(received.length).toEqual(3)
expect(received[0].cursor).toEqual('1')
expect(received[1].cursor).toEqual('2')
expect(received[2].cursor).toEqual('3')
})

test('should be resilient to errors', async () => {
let resumeToken = 100
mockSendRequest.mockImplementation(async () => {
resumeToken = resumeToken + 1

if (resumeToken == 102) throw Error('transient error')

return {
events: [{ id: EventID.createRandom('inmemory', 0).toString(), data: undefined }],
resumeToken: resumeToken.toString(),
}
})

const received = await lastValueFrom(reconApi.pipe(take(3), toArray()))

expect(received.length).toEqual(3)
expect(received[0].cursor).toEqual('101')
expect(received[1].cursor).toEqual('103')
expect(received[2].cursor).toEqual('104')
})
})
})
29 changes: 15 additions & 14 deletions packages/core/src/recon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import { EventID, StreamID } from '@ceramicnetwork/streamid'
import { Model } from '@ceramicnetwork/stream-model'

const DEFAULT_POLL_INTERVAL = 1_000 // 1 seconds
// Note this limit is arbitrary. This limit represents the upper bound on being able to recover after being down
const FEED_LIMIT = 1000

/**
* Configuration for the Recon API
Expand All @@ -48,14 +50,14 @@ export interface ReconEvent {
*/
export interface ReconEventFeedResponse {
events: Array<ReconEvent>
cursor: number
cursor: string
}

/**
* Recon API Interface
*/
export interface IReconApi extends Observable<ReconEventFeedResponse> {
init(initialCursor?: number): Promise<void>
init(initialCursor?: string): Promise<void>
registerInterest(model: StreamID): Promise<void>
put(event: ReconEvent, opts?: AbortOptions): Promise<void>
enabled: boolean
Expand All @@ -71,8 +73,7 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec

readonly #pollInterval: number
#eventsSubscription: Subscription
private readonly feed$: Subject<ReconEventFeedResponse> =
new ReplaySubject<ReconEventFeedResponse>(5)
readonly #feed$: Subject<ReconEventFeedResponse> = new Subject()
readonly #stopSignal: Subject<void> = new Subject<void>()

constructor(
Expand All @@ -82,7 +83,7 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
pollInterval: number = DEFAULT_POLL_INTERVAL
) {
super((subscriber: Subscriber<ReconEventFeedResponse>): TeardownLogic => {
return this.feed$.subscribe(subscriber)
return this.#feed$.subscribe(subscriber)
})

this.#config = config
Expand All @@ -96,7 +97,7 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
* @param initialCursor
* @returns
*/
async init(initialCursor = 0): Promise<void> {
async init(initialCursor = '0'): Promise<void> {
if (this.#initialized) {
return
}
Expand All @@ -111,7 +112,7 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
await this.registerInterest(Model.MODEL)

if (this.#config.feedEnabled) {
this.#eventsSubscription = this.createSubscription(initialCursor).subscribe(this.feed$)
this.#eventsSubscription = this.createSubscription(initialCursor).subscribe(this.#feed$)
}
}

Expand Down Expand Up @@ -143,7 +144,7 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
* @param opts Abort options
* @returns
*/
async put(event: ReconEvent, opts: AbortOptions): Promise<void> {
async put(event: ReconEvent, opts: AbortOptions = {}): Promise<void> {
if (!this.enabled) {
this.#logger.imp(`Recon: disabled, not putting event ${event.id}`)
return
Expand Down Expand Up @@ -181,15 +182,15 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
if (this.#eventsSubscription) {
this.#eventsSubscription.unsubscribe()
}
this.feed$.complete()
this.#feed$.complete()
}

/**
* Polls the Recon API for new events using the feed endpoint. This is a turned into an observable that emits the events.
* @param initialCursor The cursor to start polling from
* @returns An observable that emits the events and cursor so it can be stored and used to resume polling during restart
*/
private createSubscription(initialCursor: number): Observable<ReconEventFeedResponse> {
private createSubscription(initialCursor: string): Observable<ReconEventFeedResponse> {
// start event
return of({ events: [], cursor: initialCursor, first: true }).pipe(
// projects the starting event to an Observable that emits the next events. Then it recursively projects each event to an Observable that emits the next event
Expand All @@ -201,27 +202,27 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
// defer allows lazy creation of the observable
defer(async () => {
const response = await this.#sendRequest(
this.#url + `/ceramic/feed/events?resumeAt=${prev.cursor}`,
this.#url + `/ceramic/feed/events?resumeAt=${prev.cursor}&limit=${FEED_LIMIT}`,
{
method: 'GET',
}
)
return {
events: response.events.map(({ id, data }) => {
events: response.events.map(({ id, _data }) => {
return {
id: EventID.fromString(id),
data: undefined,
}
}),
cursor: Math.max(parseInt(response.resumeToken, 10), prev.cursor),
cursor: response.resumeToken,
first: false,
}
}).pipe(
// if the request fails retry after a certain delay (pollInterval)
retry({
delay: (err) => {
this.#logger.warn(
`Recon: event feed failed, due to connection error ${err}; attempting to retry in ${
`Recon: event feed failed, due to error ${err}; attempting to retry in ${
this.#pollInterval
}ms`
)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export class Repository {
RECON_STORE_USECASE_NAME
))
? await this.#deps.keyValueStore.get(RECON_STORE_CURSOR_KEY, RECON_STORE_USECASE_NAME)
: 0
: '0'
await this.recon.init(cursor)
this.reconEventFeedSubscription = this.recon
.pipe(concatMap(this.handleReconEvents.bind(this)))
Expand Down

0 comments on commit e094c48

Please sign in to comment.