Skip to content

Commit

Permalink
feat: Add shouldIndex metadata flag (#3122)
Browse files Browse the repository at this point in the history
* wip

* chore: Pass shouldIndex

* chore: Stricter comparison

* chore: Use makeRawCommit

* Update packages/stream-model-instance-handler/src/__tests__/model-instance-document-handler.test.ts

Co-authored-by: Paul Le Cam <[email protected]>

* Update packages/stream-model-instance-handler/src/model-instance-document-handler.ts

Co-authored-by: Paul Le Cam <[email protected]>

* Update packages/indexing/src/database-index-api.ts

Co-authored-by: Paul Le Cam <[email protected]>

* Update packages/stream-model-instance-handler/src/model-instance-document-handler.ts

Co-authored-by: Paul Le Cam <[email protected]>

* Update packages/stream-model-instance/src/model-instance-document.ts

Co-authored-by: Paul Le Cam <[email protected]>

---------

Co-authored-by: Paul Le Cam <[email protected]>
  • Loading branch information
ukstv and PaulLeCam authored Jan 29, 2024
1 parent c31c6f9 commit fed99a6
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 51 deletions.
2 changes: 1 addition & 1 deletion packages/codecs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
"devDependencies": {
"@ceramicnetwork/base-test-utils": "^1.2.0-rc.0",
"ts-essentials": "^9.3.2"
"ts-essentials": "^9.4.1"
},
"publishConfig": {
"access": "public"
Expand Down
4 changes: 3 additions & 1 deletion packages/common/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface CommitHeader {
model?: Uint8Array // StreamID encoded as byte array
schema?: string // deprecated
tags?: Array<string> // deprecated
shouldIndex?: boolean // ModelInstanceDocument indexing

[index: string]: any // allow support for future changes
}
Expand All @@ -45,7 +46,7 @@ export type GenesisCommit = {

export interface RawCommit {
id: CID
header?: CommitHeader
header?: Partial<CommitHeader>
data: any
prev: CID
}
Expand Down Expand Up @@ -79,6 +80,7 @@ export interface StreamMetadata {
schema?: string // deprecated
tags?: Array<string> // deprecated
forbidControllerChange?: boolean // deprecated, only used by TileDocument
shouldIndex?: boolean // ModelInstanceDocument indexing
[index: string]: any // allow arbitrary properties
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ export class Repository {
tip: state$.tip,
lastAnchor: lastAnchor,
firstAnchor: firstAnchor,
shouldIndex: state$.value.metadata.shouldIndex,
}

await this.index.indexStream(streamContent)
Expand Down
23 changes: 19 additions & 4 deletions packages/indexing/src/__tests__/database-index-api.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect, jest, test } from '@jest/globals'
import { jest, expect, test, beforeEach, afterEach, afterAll, beforeAll } from '@jest/globals'
import { StreamID } from '@ceramicnetwork/streamid'
import knex, { Knex } from 'knex'
import tmp from 'tmp-promise'
Expand All @@ -7,7 +7,7 @@ import pgTeardown from '@databases/pg-test/jest/globalTeardown'
import { asTableName } from '../as-table-name.util.js'
import { IndexQueryNotAvailableError } from '../index-query-not-available.error.js'
import { Model } from '@ceramicnetwork/stream-model'
import { LoggerProvider, Networks, type CeramicCoreApi } from '@ceramicnetwork/common'
import { LoggerProvider, Networks } from '@ceramicnetwork/common'
import { CID } from 'multiformats/cid'
import {
asTimestamp,
Expand Down Expand Up @@ -73,13 +73,13 @@ function modelsToIndexArgs(models: Array<StreamID>): Array<IndexModelArgs> {
}

class CompleteQueryApi implements ISyncQueryApi {
syncComplete(model: string): boolean {
syncComplete(): boolean {
return true
}
}

class IncompleteQueryApi implements ISyncQueryApi {
syncComplete(model: string): boolean {
syncComplete(): boolean {
return false
}
}
Expand Down Expand Up @@ -915,6 +915,21 @@ and indexname in (${expectedIndices});
expect(raw.dark_mode).toEqual(STREAM_TEST_DATA_PROFILE_A.settings.dark_mode)
expect(raw.id).toEqual(STREAM_TEST_DATA_PROFILE_A.id)
})

test('unindex', async () => {
await indexApi.indexStream(STREAM_CONTENT_A)
const count = () =>
dbConnection
.from(`${MODELS_TO_INDEX[0]}`)
.select('*')
.then((r) => r.length)
await expect(count()).resolves.toEqual(1)
const shouldUnindex = { ...STREAM_CONTENT_A, shouldIndex: false }
await indexApi.indexStream(shouldUnindex)
await expect(count()).resolves.toEqual(0)
await indexApi.indexStream(shouldUnindex)
await expect(count()).resolves.toEqual(0)
})
})

describe('page', () => {
Expand Down
12 changes: 10 additions & 2 deletions packages/indexing/src/database-index-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface IndexStreamArgs {
readonly tip: CID
readonly lastAnchor: Date | null
readonly firstAnchor: Date | null
readonly shouldIndex?: boolean
}

/**
Expand Down Expand Up @@ -207,14 +208,21 @@ export abstract class DatabaseIndexApi<DateType = Date | number> {
}

/**
* This method inserts the stream if it is not present in the index, or updates
* the 'content' if the stream already exists in the index.
* This method inserts the stream if it is not present in the index, updates
* the 'content' if the stream already exists in the index, or deletes the
* stream from the index if the 'shouldIndex' arg is set to false.
* @param indexingArgs
*/
async indexStream(
indexingArgs: IndexStreamArgs & { createdAt?: Date; updatedAt?: Date }
): Promise<void> {
const tableName = asTableName(indexingArgs.model)
if (indexingArgs.shouldIndex === false) {
await this.dbConnection(tableName)
.where('stream_id', indexingArgs.streamID.toString())
.delete()
return
}
const indexedData = this.getIndexedData(indexingArgs) as Record<string, unknown>
const relations = this.modelRelations.get(indexingArgs.model.toString()) ?? []
for (const relation of relations) {
Expand Down
16 changes: 8 additions & 8 deletions packages/indexing/src/local-index-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,35 +159,35 @@ export class LocalIndexApi implements IndexApi {
return this.databaseIndexApi?.getIndexedModels() ?? []
}

async convertModelDataToIndexModelsArgs(
convertModelDataToIndexModelsArgs(
modelsNoLongerIndexed: Array<ModelData>,
modelData: ModelData,
loading: LoadingInterfaceImplements = {}
): Promise<IndexModelArgs> {
const modelStreamId = modelData.streamID
this.logger.imp(`Starting indexing for Model ${modelStreamId.toString()}`)

const modelNoLongerIndexed = modelsNoLongerIndexed.some(function (oldIdx) {
return oldIdx.streamID.equals(modelStreamId)
})
const modelNoLongerIndexed = modelsNoLongerIndexed.some((oldIdx) =>
oldIdx.streamID.equals(modelStreamId)
)
// TODO(CDB-2297): Handle a model's historical sync after re-indexing
if (modelNoLongerIndexed) {
throw new Error(
`Cannot re-index model ${modelStreamId.toString()}, data may not be up-to-date`
)
}

return await _getIndexModelArgs(this.reader, modelData, loading)
return _getIndexModelArgs(this.reader, modelData, loading)
}

async indexModels(models: Array<ModelData>): Promise<void> {
const modelsNoLongerIndexed = (await this.databaseIndexApi?.getModelsNoLongerIndexed()) ?? []
const loading = {}

const indexModelsArgs = await Promise.all(
models.map(async (idx) => {
return await this.convertModelDataToIndexModelsArgs(modelsNoLongerIndexed, idx, loading)
})
models.map((idx) =>
this.convertModelDataToIndexModelsArgs(modelsNoLongerIndexed, idx, loading)
)
)

await this.databaseIndexApi?.indexModels(indexModelsArgs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ describe('ModelInstanceDocumentHandler', () => {
envelope: signedCommit.jws,
}
await expect(handler.applyCommit(signedCommitData, context, state)).rejects.toThrow(
/Updating metadata for ModelInstanceDocument Streams is not allowed/
`Unsupported metadata changes for ModelInstanceDocument Stream ${doc.id}: controllers. Only the shouldIndex argument can be changed.`
)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,18 @@ export class ModelInstanceDocumentHandler implements StreamHandler<ModelInstance
)

if (payload.header) {
throw new Error(
`Updating metadata for ModelInstanceDocument Streams is not allowed. Tried to change metadata for Stream ${streamId} from ${JSON.stringify(
state.metadata
)} to ${JSON.stringify(payload.header)}\``
)
const { shouldIndex, ...others } = payload.header
const otherKeys = Object.keys(others)
if (otherKeys.length) {
throw new Error(
`Unsupported metadata changes for ModelInstanceDocument Stream ${streamId}: ${otherKeys.join(
','
)}. Only the shouldIndex argument can be changed.`
)
}
if (shouldIndex != null) {
state.metadata.shouldIndex = shouldIndex
}
}

const oldContent = state.content ?? {}
Expand Down Expand Up @@ -240,11 +247,7 @@ export class ModelInstanceDocumentHandler implements StreamHandler<ModelInstance

validateContentLength(content)

await this._schemaValidator.validateSchema(
content,
model.content.schema,
model.commitId.toString()
)
this._schemaValidator.validateSchema(content, model.content.schema, model.commitId.toString())

// Now validate the relations
await this._validateRelationsContent(ceramic, model, content)
Expand Down
6 changes: 5 additions & 1 deletion packages/stream-model-instance-handler/src/schema-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ export class SchemaValidation {
this.validators = new LRUCache(AJV_CACHE_SIZE)
}

public validateSchema(content: Record<string, any>, schema: SchemaObject, schemaId: string) {
public validateSchema(
content: Record<string, any>,
schema: SchemaObject,
schemaId: string
): void {
let validator = this.validators.get(schemaId)
if (!validator) {
validator = buildAjv()
Expand Down
84 changes: 67 additions & 17 deletions packages/stream-model-instance/src/model-instance-document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import sizeof from 'object-sizeof'
import {
CreateOpts,
LoadOpts,
UpdateOpts,
UpdateOpts as CommonUpdateOpts,
Stream,
StreamConstructor,
StreamStatic,
Expand All @@ -17,13 +17,18 @@ import {
SignedCommitContainer,
CeramicSigner,
GenesisHeader,
CommitHeader,
StreamWriter,
StreamReader,
IntoSigner,
} from '@ceramicnetwork/common'
import { CommitID, StreamID, StreamRef } from '@ceramicnetwork/streamid'
import { fromString } from 'uint8arrays'

export interface UpdateOpts extends CommonUpdateOpts {
shouldIndex?: boolean
}

/**
* Arguments used to generate the metadata for Model Instance Documents
*/
Expand All @@ -48,6 +53,12 @@ export interface ModelInstanceDocumentMetadataArgs {
* ModelInstanceDocuments whose Model has an accountRelation of 'SINGLE'.
*/
deterministic?: boolean

/**
* Whether the stream should be stored by indexers or not. When undefined, indexers could
* index the stream if wanted.
*/
shouldIndex?: boolean
}

/**
Expand All @@ -68,6 +79,11 @@ export interface ModelInstanceDocumentMetadata {
* Unique bytes
*/
unique?: Uint8Array

/**
* Whether the stream should be indexed or not.
*/
shouldIndex?: boolean
}

const DEFAULT_CREATE_OPTS = {
Expand Down Expand Up @@ -107,7 +123,12 @@ export class ModelInstanceDocument<T = Record<string, any>> extends Stream {

get metadata(): ModelInstanceDocumentMetadata {
const metadata = this.state$.value.metadata
return { controller: metadata.controllers[0], model: metadata.model, unique: metadata.unique }
return {
controller: metadata.controllers[0],
model: metadata.model,
unique: metadata.unique,
shouldIndex: metadata.shouldIndex,
}
}

/**
Expand Down Expand Up @@ -218,18 +239,25 @@ export class ModelInstanceDocument<T = Record<string, any>> extends Stream {
* @param opts - Additional options
*/
async replace(content: T | null, opts: UpdateOpts = {}): Promise<void> {
opts = { ...DEFAULT_UPDATE_OPTS, ...opts }
const { shouldIndex, ...options } = { ...DEFAULT_UPDATE_OPTS, ...opts }
validateContentLength(content)
const signer: CeramicSigner = opts.asDID
? CeramicSigner.fromDID(opts.asDID)
: opts.signer || this.api.signer
const updateCommit = await ModelInstanceDocument.makeUpdateCommit(
signer,
const signer: CeramicSigner = options.asDID
? CeramicSigner.fromDID(options.asDID)
: options.signer || this.api.signer
let header: Partial<CommitHeader> | undefined = undefined
if (shouldIndex != null) {
header = {
shouldIndex: shouldIndex,
}
}
const rawCommit = ModelInstanceDocument._makeRawCommit(
this.commitId,
this.content,
content
content,
header
)
const updated = await this.api.applyCommit(this.id, updateCommit, opts)
const updateCommit = await signer.createDagJWS(rawCommit)
const updated = await this.api.applyCommit(this.id, updateCommit, options)
this.state$.next(updated.state)
}

Expand All @@ -240,10 +268,11 @@ export class ModelInstanceDocument<T = Record<string, any>> extends Stream {
* @param opts - Additional options
*/
async patch(jsonPatch: Operation[], opts: UpdateOpts = {}): Promise<void> {
opts = { ...DEFAULT_UPDATE_OPTS, ...opts }
const signer: CeramicSigner = opts.asDID
? CeramicSigner.fromDID(opts.asDID)
: opts.signer || this.api.signer
const { shouldIndex, ...options } = { ...DEFAULT_UPDATE_OPTS, ...opts }

const signer: CeramicSigner = options.asDID
? CeramicSigner.fromDID(options.asDID)
: options.signer || this.api.signer
jsonPatch.forEach((patch) => {
switch (patch.op) {
case 'add': {
Expand All @@ -264,11 +293,26 @@ export class ModelInstanceDocument<T = Record<string, any>> extends Stream {
prev: this.tip,
id: this.id.cid,
}
// Null check is necessary to avoid `undefined` value that can't be encoded with IPLD
if (shouldIndex != null) {
rawCommit.header = {
shouldIndex: shouldIndex,
}
}
const commit = await signer.createDagJWS(rawCommit)
const updated = await this.api.applyCommit(this.id, commit, opts)
const updated = await this.api.applyCommit(this.id, commit, options)
this.state$.next(updated.state)
}

/**
* Set the index metadata field for the stream
* @param shouldIndex - Whether the stream should be indexed or not
* @param opts - Additional options
*/
shouldIndex(shouldIndex: boolean, opts: CommonUpdateOpts = {}): Promise<void> {
return this.patch([], { ...opts, shouldIndex: shouldIndex })
}

/**
* Makes this document read-only. After this has been called any future attempts to call
* mutation methods on the instance will throw.
Expand Down Expand Up @@ -308,14 +352,20 @@ export class ModelInstanceDocument<T = Record<string, any>> extends Stream {
private static _makeRawCommit<T>(
prev: CommitID,
oldContent: T | null,
newContent: T | null
newContent: T | null,
header?: Partial<CommitHeader>
): RawCommit {
const patch = jsonpatch.compare(oldContent ?? {}, newContent ?? {})
return {
const rawCommit: RawCommit = {
data: patch,
prev: prev.commit,
id: prev.baseID.cid,
}
// Null check is necessary to avoid `undefined` value that can't be encoded with IPLD
if (header != null) {
rawCommit.header = header
}
return rawCommit
}

/**
Expand Down
Loading

0 comments on commit fed99a6

Please sign in to comment.