Skip to content

Commit

Permalink
feat: Start up Node Metrics publishing after the Model becomes availa…
Browse files Browse the repository at this point in the history
…ble locally (#3268)
  • Loading branch information
stbrody authored Jul 30, 2024
1 parent a949b77 commit 679b8f8
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 30 deletions.
1 change: 1 addition & 0 deletions packages/base-test-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class BaseTestUtils {
throw new Error(baseErrMsg + ': ' + customMsg)
}

// TODO: De-dupe this with `delayOrAbort` in abort-signal-utils.ts
static async delay(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => resolve(), ms)
Expand Down
15 changes: 15 additions & 0 deletions packages/common/src/utils/abort-signal-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,18 @@ export async function abortable<T>(
original.removeEventListener('abort', onAbort)
})
}

export async function delayOrAbort(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => resolve(), ms)
if (signal) {
const handleAbort = () => {
clearTimeout(timeout)
signal.removeEventListener('abort', handleAbort)
reject(signal.reason)
}
if (signal.aborted) handleAbort()
signal.addEventListener('abort', handleAbort)
}
})
}
109 changes: 82 additions & 27 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
CeramicSigner,
StreamStateLoader,
StreamReaderWriter,
delayOrAbort,
} from '@ceramicnetwork/common'
import {
DEFAULT_TRACE_SAMPLE_RATIO,
Expand Down Expand Up @@ -615,35 +616,21 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
// If authenticated into the node, we can start publishing metrics
// publishing metrics is enabled by default, even if no metrics config
if (this._metricsConfig?.metricsPublisherEnabled) {
// First, subscribe the node to the Model used for NodeMetrics
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
if (EnvironmentUtils.useRustCeramic()) {
// Start a background job that will wait for the Model to be available (synced over Recon)
// and then start publishing to it.
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
void this._waitForMetricsModel(metricsModel).then(
this._startPublishingNodeMetrics.bind(this, metricsModel)
)
} else {
this._logger.warn(
`Disabling publishing of Node Metrics because we are not connected to a Recon-compatible p2p node`
)
}
}
} else {
// warn that the node does not have an authenticated did
this._logger.imp(
this._logger.warn(
`The ceramic daemon is running without an authenticated DID. This means that this node cannot itself publish streams, including node metrics, and cannot use a DID as the method to authenticate with the Ceramic Anchor Service. See https://developers.ceramic.network/docs/composedb/guides/composedb-server/access-mainnet#updating-to-did-based-authentication for instructions on how to update your node to use DID authentication.`
)
}
Expand All @@ -655,6 +642,74 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
)
}

/**
* Starts up the subsystem to periodically publish Node Metrics to a Stream.
* Requires the data for the NodeMetrics Model to already be available locally
* in the ceramic-one blockstore.
* @param metricsModel - the StreamID of the Model that Node Metrics should be published to.
*/
async _startPublishingNodeMetrics(metricsModel: StreamID): Promise<void> {
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
}

/**
* Waits for Model used to publish NodeMetrics to be available locally.
* Since we subscribe to the metamodel at startup, so long as some connected node on the network
* has the model, it should eventually be available locally.
* @param model
*/
async _waitForMetricsModel(model: StreamID): Promise<void> {
let attemptNum = 0
let backoffMs = 100
const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute

while (!this._shutdownSignal.isShuttingDown()) {
try {
await this.dispatcher.getFromIpfs(model.cid)
if (attemptNum > 0) {
this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`)
}
return
} catch (err) {
if (attemptNum == 0) {
this._logger.imp(
`Waiting for Model ${model} used to publish Node Metrics to be available locally`
)
} else if (attemptNum % 5 == 0) {
this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`)
}

await this._shutdownSignal.abortable((signal) => delayOrAbort(backoffMs, signal))
attemptNum++
if (backoffMs <= maxBackoffMs) {
backoffMs *= 2
}
}
}
}

/**
* Runs some checks at node startup to ensure that the node is healthy and properly configured.
* Throws an Error if any issues are detected
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class Dispatcher {
*/
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
return await this.getFromIpfs(cid)
} catch (e) {
if (streamId) {
this._logger.err(
Expand All @@ -380,7 +380,7 @@ export class Dispatcher {
*/
async retrieveFromIPFS(cid: CID | string, path?: string): Promise<any> {
try {
return await this._getFromIpfs(cid, path)
return await this.getFromIpfs(cid, path)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
throw e
Expand Down Expand Up @@ -416,7 +416,7 @@ export class Dispatcher {
/**
* Helper function for loading a CID from IPFS
*/
private async _getFromIpfs(cid: CID | string, path?: string): Promise<any> {
async getFromIpfs(cid: CID | string, path?: string): Promise<any> {
const asCid = typeof cid === 'string' ? CID.parse(cid) : cid

// Lookup CID in cache before looking it up IPFS
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/shutdown-signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Observer, Subject } from 'rxjs'
*/
export class ShutdownSignal {
private subject: Subject<void> = new Subject()
private shuttingDown = false

/**
* Subscribers to the signal.
Expand All @@ -20,6 +21,11 @@ export class ShutdownSignal {
*/
abort(): void {
this.subject.complete()
this.shuttingDown = true
}

isShuttingDown(): boolean {
return this.shuttingDown
}

/**
Expand Down

0 comments on commit 679b8f8

Please sign in to comment.