Skip to content

Commit

Permalink
feat: turn syncing tests on in recon mode (#3169)
Browse files Browse the repository at this point in the history
* Turned on a bunch of tests

* Stray console logs

* Make creating ipfs nicer

* Lint error

* Wait for model only in recon mode

* Wait for model ony in recon mode
  • Loading branch information
stephhuynh18 authored Feb 22, 2024
1 parent 4e52f01 commit 394eebe
Show file tree
Hide file tree
Showing 20 changed files with 224 additions and 153 deletions.
90 changes: 47 additions & 43 deletions packages/cli/src/__tests__/ceramic-daemon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const seed = 'SEED'

// Should pass on v4 if updated from TileDocument
const describeIfV3 = process.env.CERAMIC_RECON_MODE ? describe.skip : describe
const testIfV3 = process.env.CERAMIC_RECON_MODE ? test.skip : test

describeIfV3('Ceramic interop: core <> http-client', () => {
describe('Ceramic interop: core <> http-client', () => {
jest.setTimeout(30000)
let ipfs: IpfsApi
let tmpFolder: tmp.DirectoryResult
Expand Down Expand Up @@ -91,14 +92,14 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
await changeHandle
}

it('healthcheck passes', async () => {
test('healthcheck passes', async () => {
const res = await fetch(`http://localhost:${daemon.port}/api/v0/node/healthcheck`)
expect(res.ok).toBeTruthy()
const text = await res.text()
expect(text).toEqual('Alive!')
})

it('healthcheck fails if ipfs unreachable', async () => {
test('healthcheck fails if ipfs unreachable', async () => {
const isOnlineSpy = jest.spyOn(ipfs, 'isOnline')
isOnlineSpy.mockRejectedValue(new Error('ipfs is sad now') as never)
const res = await fetch(`http://localhost:${daemon.port}/api/v0/node/healthcheck`)
Expand All @@ -108,7 +109,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
isOnlineSpy.mockReset()
})

it('healthcheck can skip ipfs check', async () => {
test('healthcheck can skip ipfs check', async () => {
const isOnlineSpy = jest.spyOn(ipfs, 'isOnline')
isOnlineSpy.mockRejectedValue(new Error('ipfs is sad now') as never)
const res = await fetch(
Expand All @@ -120,18 +121,18 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
isOnlineSpy.mockReset()
})

it('can store commit if the size is lesser than the maximum size ~256KB', async () => {
testIfV3('can store commit if the size is lesser than the maximum size ~256KB', async () => {
const streamtype = await TileDocument.create(client, { test: randomString(200000) })
expect(streamtype).not.toBeNull()
})

it('cannot store commit if the size is greater than the maximum size ~256KB', async () => {
testIfV3('cannot store commit if the size is greater than the maximum size ~256KB', async () => {
await expect(TileDocument.create(client, { test: randomString(300000) })).rejects.toThrow(
/exceeds the maximum block size of/
)
})

it('properly creates document', async () => {
testIfV3('properly creates document', async () => {
const doc1 = await TileDocument.create(core, { test: 123 }, null, {
anchor: false,
publish: false,
Expand All @@ -157,7 +158,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(state1).toEqual(state2)
})

it('gets anchor commit updates', async () => {
testIfV3('gets anchor commit updates', async () => {
const doc1 = await TileDocument.create(core, { test: 123 })
await anchorDoc(doc1)
expect(doc1.state.log.length).toEqual(2)
Expand All @@ -168,7 +169,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(doc2.state.anchorStatus).toEqual(AnchorStatus.ANCHORED)
})

it('loads documents correctly', async () => {
testIfV3('loads documents correctly', async () => {
const doc1 = await TileDocument.create(core, { test: 123 })
await anchorDoc(doc1)
const doc2 = await client.loadStream(doc1.id)
Expand All @@ -182,7 +183,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(StreamUtils.serializeState(doc3.state)).toEqual(StreamUtils.serializeState(doc4.state))
})

it('loads document commits correctly', async () => {
testIfV3('loads document commits correctly', async () => {
const doc1 = await TileDocument.create(core, { test: 123 })
await anchorDoc(doc1)
const doc2 = await TileDocument.load(client, doc1.id)
Expand All @@ -205,7 +206,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(serializeCommits(commits1)).toEqual(serializeCommits(commits2))
})

it('makes and gets updates correctly with subscription', async () => {
testIfV3('makes and gets updates correctly with subscription', async () => {
const initialContent = { a: 'initial' }
const middleContent = { ...initialContent, b: 'middle' }
const finalContent = { ...middleContent, c: 'final' }
Expand Down Expand Up @@ -242,7 +243,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
subscription2.unsubscribe()
})

it('makes and gets updates correctly with manual sync', async () => {
testIfV3('makes and gets updates correctly with manual sync', async () => {
const initialContent = { a: 'initial' }
const middleContent = { ...initialContent, b: 'middle' }
const finalContent = { ...middleContent, c: 'final' }
Expand All @@ -269,7 +270,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(StreamUtils.serializeState(doc1.state)).toEqual(StreamUtils.serializeState(doc2.state))
})

it('Throw on rejected update', async () => {
testIfV3('Throw on rejected update', async () => {
const contentOg = { test: 123 }
const contentRejected = { test: 'rejected' }

Expand All @@ -292,7 +293,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(streamOg.state.log.length).toEqual(2)
})

it('loads commits correctly', async () => {
testIfV3('loads commits correctly', async () => {
// Create multiple commits of the same document
const content1 = { test: 123 }
const content2 = { test: 456 }
Expand Down Expand Up @@ -386,7 +387,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(docV5Client.state.log.length).toEqual(6)
})

it('can get stream contents from /streams/contents', async () => {
testIfV3('can get stream contents from /streams/contents', async () => {
const content1 = { test: 123 }
const content2 = { test: 456, test2: 'abc' }
const content3 = { test2: 'def' }
Expand All @@ -400,7 +401,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(json).toEqual(content3)
})

it('Aborts fetch if it is taking too long', async () => {
testIfV3('Aborts fetch if it is taking too long', async () => {
const content1 = { test: 123 }
const doc = await TileDocument.create(core, content1, null, { anchor: false })

Expand All @@ -423,7 +424,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
clearTimeout(id)
})

it('Aborts fetch through passed in AbortSignal', async () => {
testIfV3('Aborts fetch through passed in AbortSignal', async () => {
const content1 = { test: 123 }
const doc = await TileDocument.create(core, content1, null, { anchor: false })

Expand All @@ -449,33 +450,36 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
clearTimeout(id)
})

it('Aborts fetch if taking too long even if given an AbortSignal that did not get aborted', async () => {
const content1 = { test: 123 }
const doc = await TileDocument.create(core, content1, null, { anchor: false })

const loadStreamMock = jest.spyOn(core, 'loadStream')
let id = null
loadStreamMock.mockImplementation(() => {
return new Promise((resolve) => {
id = setTimeout(() => {
resolve(doc)
}, 4000)
testIfV3(
'Aborts fetch if taking too long even if given an AbortSignal that did not get aborted',
async () => {
const content1 = { test: 123 }
const doc = await TileDocument.create(core, content1, null, { anchor: false })

const loadStreamMock = jest.spyOn(core, 'loadStream')
let id = null
loadStreamMock.mockImplementation(() => {
return new Promise((resolve) => {
id = setTimeout(() => {
resolve(doc)
}, 4000)
})
})
})

const controller = new AbortController()
const controller = new AbortController()

await expect(
fetchJson(`http://localhost:${daemon.port}/api/v0/streams/${doc.id}/content`, {
signal: controller.signal,
timeout: 1000,
})
).rejects.toThrow(/aborted/)
await expect(
fetchJson(`http://localhost:${daemon.port}/api/v0/streams/${doc.id}/content`, {
signal: controller.signal,
timeout: 1000,
})
).rejects.toThrow(/aborted/)

clearTimeout(id)
})
clearTimeout(id)
}
)

it('Aborts fetch if the AbortSignal given has already been aborted', async () => {
testIfV3('Aborts fetch if the AbortSignal given has already been aborted', async () => {
const content1 = { test: 123 }
const doc = await TileDocument.create(core, content1, null, { anchor: false })

Expand All @@ -489,7 +493,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
).rejects.toThrow(/aborted/)
})

it('requestAnchor works via http api', async () => {
testIfV3('requestAnchor works via http api', async () => {
const content1 = { test: 123 }

const doc = await TileDocument.create(client, content1, null, { anchor: false })
Expand All @@ -500,7 +504,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(doc.state.anchorStatus).toEqual(AnchorStatus.ANCHORED)
})

it.only('feed works via http api', async () => {
test('feed works via http api', async () => {
let messageEvent: any
const Codec = JsonAsString.pipe(AggregationDocument)
const source = new EventSource(
Expand All @@ -526,7 +530,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
expect(messageEvent.metadata.model instanceof StreamID).toBeTruthy()
}, 30000)

describe('multiqueries', () => {
describeIfV3('multiqueries', () => {
let docA, docB, docC, docD
beforeEach(async () => {
docD = await TileDocument.create(core, { test: '321d' })
Expand Down Expand Up @@ -605,7 +609,7 @@ describeIfV3('Ceramic interop: core <> http-client', () => {
})
})

describe('pin api', () => {
describeIfV3('pin api', () => {
let docA, docB
let adminClient: CeramicClient

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/__tests__/ceramic-error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function safeRead(filepath: string): string {
return ''
}
}

// TODO: Tests marked with IfV3 should passed in V' if updated from TileDocuments to Models/MIDs
const testIfV3 = process.env.CERAMIC_RECON_MODE ? test.skip : test

beforeAll(async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/ipfs-connection-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ export class IpfsConnectionFactory {

const appliedConfig = mergeOptions(defaultConfig, overrideConfig)

return ipfs.createIPFS(appliedConfig, false)
return ipfs.createIPFS({ go: appliedConfig }, false)
}
}
36 changes: 35 additions & 1 deletion packages/common-test-utils/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { CID } from 'multiformats/cid'
import { BehaviorSubject } from 'rxjs'
import { BehaviorSubject, Observable, filter, timeout, throwError, firstValueFrom } from 'rxjs'
import { StreamID } from '@ceramicnetwork/streamid'
import type { StreamState, Stream } from '@ceramicnetwork/common'
import {
Expand All @@ -14,6 +14,7 @@ import first from 'it-first'
import { BaseTestUtils } from '@ceramicnetwork/base-test-utils'

export const testIfV3 = process.env['CERAMIC_RECON_MODE'] ? test.skip : test
export const describeIfV3 = process.env['CERAMIC_RECON_MODE'] ? describe.skip : describe

class FakeRunningState extends BehaviorSubject<StreamState> implements RunningStateLike {
readonly id: StreamID
Expand Down Expand Up @@ -128,4 +129,37 @@ export class CommonTestUtils {
],
}
}

static async waitFor<T>(
observable: Observable<T>,
predicate: (value: T) => boolean,
timeoutMs = 1000 * 30
): Promise<void> {
await firstValueFrom(
observable.pipe(
filter(predicate),
timeout({
each: timeoutMs,
with: () => throwError(() => new Error(`Timeout after ${timeoutMs}ms`)),
})
)
)
}

static async waitForEvent(
reconFeed: Observable<Events>,
cid: CID,
timeoutMs = 1000 * 30
): Promise<void> {
const hasEventForCID = ({ events }: Events) => {
return events.some((event) => event.id.event.toString() === cid.toString())
}
await this.waitFor(reconFeed, hasEventForCID, timeoutMs).catch((err) => {
throw new Error(`Error while waiting for event for CID ${cid}: ${err}`)
})
}
}

type Events = {
events: Array<{ id: { event: CID } }>
}
27 changes: 20 additions & 7 deletions packages/core/src/__tests__/ceramic-feed.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect, describe, test, beforeAll, afterAll } from '@jest/globals'
import { EventType, type IpfsApi } from '@ceramicnetwork/common'
import { EventType, type IpfsApi, Networks } from '@ceramicnetwork/common'
import { Utils as CoreUtils } from '@ceramicnetwork/core'
import { TileDocument } from '@ceramicnetwork/stream-tile'
import { createIPFS, swarmConnect } from '@ceramicnetwork/ipfs-daemon'
Expand All @@ -10,6 +10,7 @@ import { FeedDocument } from '../feed.js'
import { createCeramic } from './create-ceramic.js'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'

// Should pass on v4 if updated from TileDocument
const describeIfV3 = process.env.CERAMIC_RECON_MODE ? describe.skip : describe

describe('Ceramic feed', () => {
Expand All @@ -18,8 +19,18 @@ describe('Ceramic feed', () => {
let ceramic1: Ceramic
let ceramic2: Ceramic
beforeAll(async () => {
ipfs1 = await createIPFS()
ipfs2 = await createIPFS()
ipfs1 = await createIPFS({
rust: {
type: 'binary',
network: Networks.INMEMORY,
},
})
ipfs2 = await createIPFS({
rust: {
type: 'binary',
network: Networks.INMEMORY,
},
})
ceramic1 = await createCeramic(ipfs1)
ceramic2 = await createCeramic(ipfs2)
await swarmConnect(ipfs2, ipfs1)
Expand Down Expand Up @@ -125,10 +136,7 @@ describe('Ceramic feed', () => {
})
})

// Should pass once Recon is integrated and cross node-syncing is enabled
const testIfV3ShouldPass = process.env.CERAMIC_RECON_MODE ? test.skip : test

testIfV3ShouldPass('add entry after creating/loading indexed model', async () => {
test('add entry after creating/loading indexed model', async () => {
const MODEL_DEFINITION: ModelDefinition = {
name: 'myModel',
version: '1.0',
Expand All @@ -143,6 +151,11 @@ describe('Ceramic feed', () => {
// create model on different node
const model = await Model.create(ceramic2, MODEL_DEFINITION)

// wait for model to be received
if (process.env.CERAMIC_RECON_MODE) {
await TestUtils.waitForEvent(ceramic1.repository.recon, model.tip)
}

// load model
await Model.load(ceramic1, model.id)

Expand Down
6 changes: 2 additions & 4 deletions packages/core/src/__tests__/ceramic-query-response.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { jest, expect, describe, test, afterEach, beforeEach } from '@jest/globals'
import { type IpfsApi } from '@ceramicnetwork/common'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { CommonTestUtils as TestUtils, describeIfV3 } from '@ceramicnetwork/common-test-utils'
import { TileDocument } from '@ceramicnetwork/stream-tile'
import { StreamID } from '@ceramicnetwork/streamid'
import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
Expand Down Expand Up @@ -41,9 +41,7 @@ function getQueryPublishedPromise(
})
}

// These tests will likely pass in a way on v4, but there is no pubsub
const describeIfV3 = process.env.CERAMIC_RECON_MODE ? describe.skip : describe

// Will not pass in V' as there is no pubsub or syncing
describeIfV3('Response to pubsub queries handling', () => {
jest.setTimeout(30 * 1000)

Expand Down
Loading

0 comments on commit 394eebe

Please sign in to comment.