Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Start up Node Metrics publishing after the Model becomes available locally #3268

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/base-test-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class BaseTestUtils {
throw new Error(baseErrMsg + ': ' + customMsg)
}

// TODO: De-dupe this with `delayOrAbort` in abort-signal-utils.ts
static async delay(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => resolve(), ms)
Expand Down
15 changes: 15 additions & 0 deletions packages/common/src/utils/abort-signal-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,18 @@ export async function abortable<T>(
original.removeEventListener('abort', onAbort)
})
}

export async function delayOrAbort(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => resolve(), ms)
if (signal) {
const handleAbort = () => {
clearTimeout(timeout)
signal.removeEventListener('abort', handleAbort)
reject(signal.reason)
}
if (signal.aborted) handleAbort()
signal.addEventListener('abort', handleAbort)
}
})
}
109 changes: 82 additions & 27 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
CeramicSigner,
StreamStateLoader,
StreamReaderWriter,
delayOrAbort,
} from '@ceramicnetwork/common'
import {
DEFAULT_TRACE_SAMPLE_RATIO,
Expand Down Expand Up @@ -615,35 +616,21 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
// If authenticated into the node, we can start publishing metrics
// publishing metrics is enabled by default, even if no metrics config
if (this._metricsConfig?.metricsPublisherEnabled) {
// First, subscribe the node to the Model used for NodeMetrics
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
if (EnvironmentUtils.useRustCeramic()) {
// Start a background job that will wait for the Model to be available (synced over Recon)
// and then start publishing to it.
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
void this._waitForMetricsModel(metricsModel).then(
this._startPublishingNodeMetrics.bind(this, metricsModel)
)
} else {
this._logger.warn(
`Disabling publishing of Node Metrics because we are not connected to a Recon-compatible p2p node`
)
}
}
} else {
// warn that the node does not have an authenticated did
this._logger.imp(
this._logger.warn(
`The ceramic daemon is running without an authenticated DID. This means that this node cannot itself publish streams, including node metrics, and cannot use a DID as the method to authenticate with the Ceramic Anchor Service. See https://developers.ceramic.network/docs/composedb/guides/composedb-server/access-mainnet#updating-to-did-based-authentication for instructions on how to update your node to use DID authentication.`
)
}
Expand All @@ -655,6 +642,74 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
)
}

/**
* Starts up the subsystem to periodically publish Node Metrics to a Stream.
* Requires the data for the NodeMetrics Model to already be available locally
* in the ceramic-one blockstore.
* @param metricsModel - the StreamID of the Model that Node Metrics should be published to.
*/
async _startPublishingNodeMetrics(metricsModel: StreamID): Promise<void> {
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
}

/**
* Waits for Model used to publish NodeMetrics to be available locally.
* Since we subscribe to the metamodel at startup, so long as some connected node on the network
* has the model, it should eventually be available locally.
* @param model
*/
async _waitForMetricsModel(model: StreamID): Promise<void> {
let attemptNum = 0
let backoffMs = 100
const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute

while (!this._shutdownSignal.isShuttingDown()) {
try {
await this.dispatcher.getFromIpfs(model.cid)
if (attemptNum > 0) {
this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`)
}
return
} catch (err) {
if (attemptNum == 0) {
this._logger.imp(
`Waiting for Model ${model} used to publish Node Metrics to be available locally`
)
} else if (attemptNum % 5 == 0) {
this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`)
}

await this._shutdownSignal.abortable((signal) => delayOrAbort(backoffMs, signal))
attemptNum++
if (backoffMs <= maxBackoffMs) {
backoffMs *= 2
}
}
}
}

/**
* Runs some checks at node startup to ensure that the node is healthy and properly configured.
* Throws an Error if any issues are detected
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class Dispatcher {
*/
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
return await this.getFromIpfs(cid)
} catch (e) {
if (streamId) {
this._logger.err(
Expand All @@ -380,7 +380,7 @@ export class Dispatcher {
*/
async retrieveFromIPFS(cid: CID | string, path?: string): Promise<any> {
try {
return await this._getFromIpfs(cid, path)
return await this.getFromIpfs(cid, path)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
throw e
Expand Down Expand Up @@ -416,7 +416,7 @@ export class Dispatcher {
/**
* Helper function for loading a CID from IPFS
*/
private async _getFromIpfs(cid: CID | string, path?: string): Promise<any> {
async getFromIpfs(cid: CID | string, path?: string): Promise<any> {
const asCid = typeof cid === 'string' ? CID.parse(cid) : cid

// Lookup CID in cache before looking it up IPFS
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/shutdown-signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Observer, Subject } from 'rxjs'
*/
export class ShutdownSignal {
private subject: Subject<void> = new Subject()
private shuttingDown = false

/**
* Subscribers to the signal.
Expand All @@ -20,6 +21,11 @@ export class ShutdownSignal {
*/
abort(): void {
this.subject.complete()
this.shuttingDown = true
}

isShuttingDown(): boolean {
return this.shuttingDown
}

/**
Expand Down