diff --git a/example/index.js b/example/index.js index a0c47f03..9350677e 100644 --- a/example/index.js +++ b/example/index.js @@ -9,7 +9,11 @@ const config = { yamcsProcessor: "realtime", yamcsFolder: "myproject", throttleRate: 1000, - maxBatchSize: 20 + // Batch size is specified in characers as there is no performant way of calculating true + // memory usage of a string buffer in real-time. + // String characters can be 8 or 16 bits in JavaScript, depending on the code page used. + // Thus 500,000 characters requires up to 16MB of memory (1,000,000 * 16). + maxBufferSize: 1000000 }; const STATUS_STYLES = { NO_STATUS: { @@ -41,7 +45,9 @@ const STATUS_STYLES = { const openmct = window.openmct; (() => { - const THIRTY_MINUTES = 30 * 60 * 1000; + const ONE_SECOND = 1000; + const ONE_MINUTE = ONE_SECOND * 60; + const THIRTY_MINUTES = ONE_MINUTE * 30; openmct.setAssetPath("/node_modules/openmct/dist"); @@ -54,7 +60,6 @@ const openmct = window.openmct; document.addEventListener("DOMContentLoaded", function () { openmct.start(); }); - openmct.install( openmct.plugins.Conductor({ menuOptions: [ diff --git a/src/openmct-yamcs.js b/src/openmct-yamcs.js index c7c44956..4d0cd741 100644 --- a/src/openmct-yamcs.js +++ b/src/openmct-yamcs.js @@ -69,7 +69,7 @@ export default function install( configuration.yamcsInstance, configuration.yamcsProcessor, configuration.throttleRate, - configuration.maxBatchSize + configuration.maxBufferSize ); openmct.telemetry.addProvider(realtimeTelemetryProvider); realtimeTelemetryProvider.connect(); diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index 7889b577..6820740e 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -39,11 +39,17 @@ import { import { commandToTelemetryDatum } from './commands.js'; import { eventToTelemetryDatum, eventShouldBeFiltered } from './events.js'; +const ONE_SECOND = 1000; +const ONE_MILLION_CHARACTERS = 1000000; + +//Everything except parameter messages are housekeeping and if they're dropped bad things can happen. +const PARAMETER_MESSAGES = '^{[\\s]*"type":\\s"parameters'; + export default class RealtimeProvider { #socketWorker = null; #openmct; - constructor(openmct, url, instance, processor = 'realtime', maxBatchWait = 1000, maxBatchSize = 15) { + constructor(openmct, url, instance, processor = 'realtime', throttleRate = ONE_SECOND, maxBufferSize = ONE_MILLION_CHARACTERS) { this.url = url; this.instance = instance; this.processor = processor; @@ -59,8 +65,10 @@ export default class RealtimeProvider { this.subscriptionsByCall = new Map(); this.subscriptionsById = {}; this.#socketWorker = new openmct.telemetry.BatchingWebSocket(openmct); + this.#socketWorker.setThrottleMessagePattern(PARAMETER_MESSAGES); this.#openmct = openmct; - this.#setBatchingStrategy(maxBatchWait, maxBatchSize); + this.#socketWorker.setThrottleRate(throttleRate); + this.#socketWorker.setMaxBufferSize(maxBufferSize); this.addSupportedObjectTypes(Object.values(OBJECT_TYPES)); this.addSupportedDataTypes(Object.values(DATA_TYPES)); @@ -82,33 +90,6 @@ export default class RealtimeProvider { this.#setCallFromClock(clock); } } - #setBatchingStrategy(maxBatchWait, maxBatchSize) { - // This strategy batches parameter value messages - this.#socketWorker.setBatchingStrategy({ - /* istanbul ignore next */ - shouldBatchMessage: /* istanbul ignore next */ (message) => { - // If a parameter value message, the message type will be "parameters" - // The type field is always located at a character offset of 13 and - // if it is "parameters" will be 10 characters long. - const type = message.substring(13, 23); - - return type === 'parameters'; - }, - /* istanbul ignore next */ - getBatchIdFromMessage: /* istanbul ignore next */ (message) => { - // Only dealing with "parameters" messages at this point. The call number - // identifies the parameter, and is used for batching. Will be located - // at a character offset of 36. Because it is of indeterminate length - // (we don't know the number) we have to do a sequential search forward - // from the 37th character for a terminating ",". - const callNumber = message.substring(36, message.indexOf(",", 37)); - - return callNumber; - } - }); - this.#socketWorker.setMaxBatchWait(maxBatchWait); - this.#socketWorker.setMaxBatchSize(maxBatchSize); - } addSupportedObjectTypes(types) { types.forEach(type => this.supportedObjectTypes[type] = type); @@ -158,7 +139,7 @@ export default class RealtimeProvider { if (subscriptionDetails) { this.sendUnsubscribeMessage(subscriptionDetails); - this.subscriptionsByCall.delete(subscriptionDetails.call.toString()); + this.subscriptionsByCall.delete(subscriptionDetails.call); delete this.subscriptionsById[id]; } }; @@ -183,12 +164,16 @@ export default class RealtimeProvider { this.sendUnsubscribeMessage(subscriptionDetails); if (this.subscriptionsById[id]) { - this.subscriptionsByCall.delete(this.subscriptionsById[id].call.toString()); + this.subscriptionsByCall.delete(this.subscriptionsById[id].call); delete this.subscriptionsById[id]; } }; } + getSubscriptionByObjectIdentifier(identifier) { + return Object.values(this.subscriptionsById).find(subscription => this.#openmct.objects.areIdsEqual(subscription.domainObject.identifier, identifier)); + } + buildSubscriptionDetails(domainObject, callback, options) { let subscriptionId = this.lastSubscriptionId++; let subscriptionDetails = { @@ -224,52 +209,78 @@ export default class RealtimeProvider { }); if (correspondingSubscription !== undefined) { - this.remoteClockCallNumber = correspondingSubscription.call.toString(); + this.remoteClockCallNumber = correspondingSubscription.call; } else { delete this.remoteClockCallNumber; } } - #processBatchQueue(batchQueue, call) { - let subscriptionDetails = this.subscriptionsByCall.get(call); - let telemetryData = []; + #processParameterUpdates(parameterValuesByCall) { + //If remote clock active, process its value before any telemetry values to ensure the bounds are always up to date. + if (this.remoteClockCallNumber !== undefined) { + const remoteClockValues = parameterValuesByCall.get(this.remoteClockCallNumber); + const subscriptionDetails = this.subscriptionsByCall.get(this.remoteClockCallNumber); - // possibly cancelled - if (!subscriptionDetails) { - return; - } + if (remoteClockValues !== undefined && remoteClockValues.length > 0) { + const allClockValues = []; - batchQueue.forEach((rawMessage) => { - const message = JSON.parse(rawMessage); - const values = message.data.values || []; - const parentName = subscriptionDetails.domainObject.name; + remoteClockValues.forEach((parameterValue) => { + this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allClockValues); + }); - values.forEach(parameter => { - const datum = convertYamcsToOpenMctDatum(parameter, parentName); + if (allClockValues.length > 0) { + subscriptionDetails.callback(allClockValues); + } + + // Delete so we don't process it twice. + parameterValuesByCall.delete(this.remoteClockCallNumber); + } + } - if (this.observingStaleness[subscriptionDetails.name] !== undefined) { - const status = STALENESS_STATUS_MAP[parameter.acquisitionStatus]; + // Now process all non-clock parameter updates + for (const [call, parameterValues] of parameterValuesByCall.entries()) { + const allTelemetryData = []; + const subscriptionDetails = this.subscriptionsByCall.get(call); - if (this.observingStaleness[subscriptionDetails.name].response.isStale !== status) { - const stalenesResponseObject = buildStalenessResponseObject( - status, - parameter[METADATA_TIME_KEY] - ); - this.observingStaleness[subscriptionDetails.name].response = stalenesResponseObject; - this.observingStaleness[subscriptionDetails.name].callback(stalenesResponseObject); - } - } + // possibly cancelled + if (!subscriptionDetails) { + continue; + } - addLimitInformation(parameter, datum); - telemetryData.push(datum); + parameterValues.forEach((parameterValue) => { + this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allTelemetryData); }); - }); - if (telemetryData.length > 0) { - subscriptionDetails.callback(telemetryData); + if (allTelemetryData.length > 0) { + subscriptionDetails.callback(allTelemetryData); + } } } + #convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allTelemetryData) { + const values = parameterValue.data.values || []; + const parentName = subscriptionDetails.domainObject.name; + values.forEach(parameter => { + const datum = convertYamcsToOpenMctDatum(parameter, parentName); + + if (this.observingStaleness[subscriptionDetails.name] !== undefined) { + const status = STALENESS_STATUS_MAP[parameter.acquisitionStatus]; + + if (this.observingStaleness[subscriptionDetails.name].response.isStale !== status) { + const stalenesResponseObject = buildStalenessResponseObject( + status, + parameter[METADATA_TIME_KEY] + ); + this.observingStaleness[subscriptionDetails.name].response = stalenesResponseObject; + this.observingStaleness[subscriptionDetails.name].callback(stalenesResponseObject); + } + } + + addLimitInformation(parameter, datum); + allTelemetryData.push(datum); + }); + } + connect() { if (this.connected) { return; @@ -285,84 +296,87 @@ export default class RealtimeProvider { }); this.#socketWorker.addEventListener('batch', (batchEvent) => { - const batch = batchEvent.detail; - - let remoteClockValue; - // If remote clock active, process its value before any telemetry values to ensure the bounds are always up to date. - if (this.remoteClockCallNumber !== undefined) { - remoteClockValue = batch[this.remoteClockCallNumber]; - if (remoteClockValue !== undefined) { - this.#processBatchQueue(batch[this.remoteClockCallNumber], this.remoteClockCallNumber); - - // Delete so we don't process it twice. - delete batch[this.remoteClockCallNumber]; - } - } + const newBatch = batchEvent.detail; + const parametersByCall = new Map(); + newBatch.forEach(messageString => { + const message = JSON.parse(messageString); + const call = message.call; + if (message.type === 'parameters') { + // First, group parameter updates by call + let arrayOfParametersForCall = parametersByCall.get(call); + + if (arrayOfParametersForCall === undefined) { + arrayOfParametersForCall = []; + parametersByCall.set(call, arrayOfParametersForCall); + } - Object.keys(batch).forEach((call) => { - this.#processBatchQueue(batch[call], call); - }); - }); + arrayOfParametersForCall.push(message); + } else { + if (!this.isSupportedDataType(message.type)) { + return; + } - this.#socketWorker.addEventListener('message', (messageEvent) => { - const message = JSON.parse(messageEvent.detail); - if (!this.isSupportedDataType(message.type)) { - return; - } + const isReply = message.type === DATA_TYPES.DATA_TYPE_REPLY; + let subscriptionDetails; - const isReply = message.type === DATA_TYPES.DATA_TYPE_REPLY; - const call = message.call; - let subscriptionDetails; + if (isReply) { + const id = message.data.replyTo; + subscriptionDetails = this.subscriptionsById[id]; - if (isReply) { - const id = message.data.replyTo; - subscriptionDetails = this.subscriptionsById[id]; - subscriptionDetails.call = call; - // Subsequent retrieval uses a string, so for performance reasons we use a string as a key. - this.subscriptionsByCall.set(call.toString(), subscriptionDetails); + // Susbcriptions can be cancelled before we even get to this stage during tests due to rapid navigation. + if (!subscriptionDetails) { + return; + } - const remoteClockIdentifier = this.#openmct.time.getClock()?.identifier; - const isRemoteClockActive = remoteClockIdentifier !== undefined; + subscriptionDetails.call = call; + // Subsequent retrieval uses a string, so for performance reasons we use a string as a key. + this.subscriptionsByCall.set(call, subscriptionDetails); - if (isRemoteClockActive && subscriptionDetails.domainObject.identifier.key === remoteClockIdentifier.key) { - this.remoteClockCallNumber = call.toString(); - } - } else { - subscriptionDetails = this.subscriptionsByCall.get(message.call.toString()); + const remoteClockIdentifier = this.#openmct.time.getClock()?.identifier; + const isRemoteClockActive = remoteClockIdentifier !== undefined; - // possibly cancelled - if (!subscriptionDetails) { - return; - } - - if (this.isCommandMessage(message)) { - const datum = commandToTelemetryDatum(message.data); - subscriptionDetails.callback(datum); - } else if (this.isEventMessage(message)) { - if (eventShouldBeFiltered(message.data, subscriptionDetails.options)) { - // ignore event + if (isRemoteClockActive && subscriptionDetails.domainObject.identifier.key === remoteClockIdentifier.key) { + this.remoteClockCallNumber = call; + } } else { - const datum = eventToTelemetryDatum(message.data); - subscriptionDetails.callback(datum); - } - } else if (this.isMdbChangesMessage(message)) { - if (!this.isParameterType(message)) { - return; - } - - const parameterName = message.data.parameterOverride.parameter; - if (this.observingLimitChanges[parameterName] !== undefined) { - const alarmRange = message.data.parameterOverride.defaultAlarm?.staticAlarmRange ?? []; - this.observingLimitChanges[parameterName].callback(getLimitFromAlarmRange(alarmRange)); + subscriptionDetails = this.subscriptionsByCall.get(message.call); + + // possibly cancelled + if (!subscriptionDetails) { + return; + } + + if (this.isCommandMessage(message)) { + const datum = commandToTelemetryDatum(message.data); + subscriptionDetails.callback(datum); + } else if (this.isEventMessage(message)) { + if (eventShouldBeFiltered(message.data, subscriptionDetails.options)) { + // ignore event + } else { + const datum = eventToTelemetryDatum(message.data); + subscriptionDetails.callback(datum); + } + } else if (this.isMdbChangesMessage(message)) { + if (!this.isParameterType(message)) { + return; + } + + const parameterName = message.data.parameterOverride.parameter; + if (this.observingLimitChanges[parameterName] !== undefined) { + const alarmRange = message.data.parameterOverride.defaultAlarm?.staticAlarmRange ?? []; + this.observingLimitChanges[parameterName].callback(getLimitFromAlarmRange(alarmRange)); + } + + if (subscriptionDetails.callback) { + subscriptionDetails.callback(message.data); + } + } else { + subscriptionDetails.callback(message.data); + } } - - if (subscriptionDetails.callback) { - subscriptionDetails.callback(message.data); - } - } else { - subscriptionDetails.callback(message.data); } - } + }); + this.#processParameterUpdates(parametersByCall); }); } diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 4aad789a..c7a1cc05 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -52,19 +52,22 @@ test.describe('Realtime telemetry displays', () => { }); // Go to baseURL - await page.goto('./', { waitUntil: 'domcontentloaded' }); + await page.goto('./', { waitUntil: 'networkidle' }); await page.evaluate((thirtyMinutes) => { - const openmct = window.openmct; - - openmct.install(openmct.plugins.RemoteClock({ - namespace: "taxonomy", - key: "~myproject~Battery1_Temp" - })); - - openmct.time.setClock('remote-clock'); - openmct.time.setClockOffsets({ - start: -thirtyMinutes, - end: 0 + return new Promise((resolve) => { + const openmct = window.openmct; + + openmct.install(openmct.plugins.RemoteClock({ + namespace: "taxonomy", + key: "~myproject~Battery1_Temp" + })); + + openmct.time.setClock('remote-clock'); + openmct.time.setClockOffsets({ + start: -thirtyMinutes, + end: 15000 + }); + setTimeout(resolve, 2000); }); }, THIRTY_MINUTES); yamcsURL = new URL('/yamcs-proxy/', page.url()).toString(); @@ -119,7 +122,9 @@ test.describe('Realtime telemetry displays', () => { test('Correctly shows the latest values', async ({ page }) => { // Wait a reasonable amount of time for new telemetry to come in. - // There is nothing significant about the number chosen. + // There is nothing significant about the number chosen. It's + // long enough to ensure we have new telemetry, short enough that + // it doesn't significantly increase test time. const WAIT_FOR_MORE_TELEMETRY = 3000; const ladTable = await getLadTableByName(page, 'Test LAD Table'); @@ -248,31 +253,58 @@ test.describe('Realtime telemetry displays', () => { expect(notification).toHaveCount(0); } }); - - test('Open MCT does drop telemetry when the UI is under load', async ({ page }) => { - // 1. Make sure the display is done loading, and populated with values (ie. we are in a steady state) - const ladTable = await getLadTableByName(page, 'Test LAD Table'); - await getParameterValuesFromLadTable(ladTable); - - // 2. Block the UI with a loop + /** + * This tests for an edge-case found during testing where throttling occurs during subscription handshaking with the server. + * In this scenario, subscribers never receive telemetry because the subscription was never properly initialized. + * This test confirms that after blocking the UI and inducing throttling, that all subscribed telemetry objects received telemetry. + */ + test('When the UI is blocked during initialization, does not drop subscription housekeeping messages', async ({ page }) => { + // 1. Block the UI await page.evaluate(() => { return new Promise((resolveBlockingLoop) => { - //5s x 10Hz data = 50 telemetry values which should easily overrun the buffer length of 20. let start = Date.now(); let now = Date.now(); - // Block the UI thread for 5s - while (now - start < 5000) { + // Block the UI thread for 6s + while (now - start < 10000) { now = Date.now(); } resolveBlockingLoop(); }); }); - // Check for telemetry dropped notification + + //Confirm that throttling occurred const notification = page.getByRole('alert'); - expect(notification).toHaveCount(1); const text = await notification.innerText(); expect(text).toBe('Telemetry dropped due to client rate limiting.'); + + //Confirm that all subscribed telemetry points receive telemetry. This tests that subscriptions were established successfully and + //tests for a failure mode where housekeeping telemetry was being dropped if the UI was blocked during initialization of telemetry subscriptions + const parametersToSubscribeTo = Object.values(namesToParametersMap).map(parameter => parameter.replaceAll('/', '~')); + const subscriptionsThatTelemetry = await page.evaluate(async (parameters) => { + const openmct = window.openmct; + const telemetryObjects = await Promise.all( + Object.values(parameters).map( + (parameterId) => openmct.objects.get( + { + namespace: 'taxonomy', + key: parameterId + } + ) + )); + const subscriptionsAllReturned = await Promise.all(telemetryObjects.map((telemetryObject) => { + return new Promise(resolve => { + const unsubscribe = openmct.telemetry.subscribe(telemetryObject, () => { + unsubscribe(); + resolve(true); + }); + }); + })); + + return subscriptionsAllReturned; + }, parametersToSubscribeTo); + + expect(subscriptionsThatTelemetry.length).toBe(parametersToSubscribeTo.length); }); test('Open MCT shows the latest telemetry after UI is temporarily blocked', async ({ page }) => { @@ -283,19 +315,24 @@ test.describe('Realtime telemetry displays', () => { return new Promise((resolveBlockingLoop) => { let start = Date.now(); let now = Date.now(); - // Block the UI thread for 5s - while (now - start < 5000) { + // Block the UI thread for 10s + while (now - start < 10000) { now = Date.now(); } - resolveBlockingLoop(); + requestIdleCallback(resolveBlockingLoop); }); }); + //Confirm that throttling occurred + const notification = page.getByRole('alert'); + const text = await notification.innerText(); + expect(text).toBe('Telemetry dropped due to client rate limiting.'); + // Disable playback await disableLink(yamcsURL); - // Wait 1 second for values to propagate to client and render on screen. + // Wait for values to propagate to client and render on screen. await page.waitForTimeout(TELEMETRY_PROPAGATION_TIME); const latestValueObjects = await latestParameterValues(Object.values(namesToParametersMap), yamcsURL); @@ -307,7 +344,7 @@ test.describe('Realtime telemetry displays', () => { test('Open MCT accurately batches telemetry when requested', async ({ page }) => { - // 1. Subscribe to batched telemetry, + // 1. Subscribe to batched telemetry,e const telemetryValues = await page.evaluate(async () => { const openmct = window.openmct; const telemetryObject = await openmct.objects.get({ @@ -342,7 +379,6 @@ test.describe('Realtime telemetry displays', () => { }); const formattedParameterArchiveTelemetry = toOpenMctTelemetryFormat(parameterArchiveTelemetry); sortOpenMctTelemetryAscending(formattedParameterArchiveTelemetry); - telemetryValues.forEach((telemetry, index) => { expect(telemetry.value).toBe(formattedParameterArchiveTelemetry[index].value); expect(telemetry.timestamp).toBe(formattedParameterArchiveTelemetry[index].timestamp); @@ -414,7 +450,7 @@ test.describe('Realtime telemetry displays', () => { } /** - * @param {import('playwright').Page} page + * @param {import('playwright').Page} page * @returns {Promise<{parameterNameText: string, parameterValueText: string}[]>} */ async function getParameterValuesFromAllGauges(page) {