From 9af24cf6a83f4ea06315aa3afe4628adb7c21b50 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sun, 8 Sep 2024 14:06:40 -0700 Subject: [PATCH 01/17] Reduce complexity of buffering implementation --- example/index.js | 25 ++- src/providers/realtime-provider.js | 257 +++++++++++++++-------------- 2 files changed, 151 insertions(+), 131 deletions(-) diff --git a/example/index.js b/example/index.js index 449faf90..59c9b4a4 100644 --- a/example/index.js +++ b/example/index.js @@ -9,7 +9,11 @@ const config = { yamcsProcessor: "realtime", yamcsFolder: "myproject", throttleRate: 1000, - maxBatchSize: 20 + // Batch size is specified in characers as there is no performant way of calculating true + // memory usage of a string buffer in real-time. + // String characters can be 8 or 16 bits in JavaScript, depending on the code page used. + // Thus 500,000 characters requires up to 16MB of memory (1,000,000 * 16). + maxBatchSize: 1000000 }; const STATUS_STYLES = { NO_STATUS: { @@ -41,7 +45,11 @@ const STATUS_STYLES = { const openmct = window.openmct; (() => { - const THIRTY_MINUTES = 30 * 60 * 1000; + const ONE_SECOND = 1000; + const FIFTEEN_SECONDS = ONE_SECOND * 15; + const ONE_MINUTE = ONE_SECOND * 60; + const FIFTEEN_MINUTES = ONE_MINUTE * 15; + const THIRTY_MINUTES = ONE_MINUTE * 30; openmct.setAssetPath("/node_modules/openmct/dist"); @@ -54,10 +62,21 @@ const openmct = window.openmct; document.addEventListener("DOMContentLoaded", function () { openmct.start(); }); - + openmct.install(openmct.plugins.RemoteClock({namespace: 'taxonomy', + key: '~myproject~A' + })); openmct.install( openmct.plugins.Conductor({ menuOptions: [ + { + name: "Remote", + timeSystem: 'utc', + clock: 'remote-clock', + clockOffsets: { + start: -FIFTEEN_MINUTES, + end: FIFTEEN_SECONDS + } + }, { name: "Realtime", timeSystem: "utc", diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index 7889b577..28da2367 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -39,11 +39,14 @@ import { import { commandToTelemetryDatum } from './commands.js'; import { eventToTelemetryDatum, eventShouldBeFiltered } from './events.js'; +const ONE_SECOND = 1000; +const ONE_MILLION_CHARACTERS = 1000000; + export default class RealtimeProvider { #socketWorker = null; #openmct; - constructor(openmct, url, instance, processor = 'realtime', maxBatchWait = 1000, maxBatchSize = 15) { + constructor(openmct, url, instance, processor = 'realtime', throttleRate = ONE_SECOND, maxBufferSize = ONE_MILLION_CHARACTERS) { this.url = url; this.instance = instance; this.processor = processor; @@ -60,7 +63,8 @@ export default class RealtimeProvider { this.subscriptionsById = {}; this.#socketWorker = new openmct.telemetry.BatchingWebSocket(openmct); this.#openmct = openmct; - this.#setBatchingStrategy(maxBatchWait, maxBatchSize); + this.#socketWorker.setThrottleRate(throttleRate); + this.#socketWorker.setMaxBufferSize(maxBufferSize); this.addSupportedObjectTypes(Object.values(OBJECT_TYPES)); this.addSupportedDataTypes(Object.values(DATA_TYPES)); @@ -82,33 +86,6 @@ export default class RealtimeProvider { this.#setCallFromClock(clock); } } - #setBatchingStrategy(maxBatchWait, maxBatchSize) { - // This strategy batches parameter value messages - this.#socketWorker.setBatchingStrategy({ - /* istanbul ignore next */ - shouldBatchMessage: /* istanbul ignore next */ (message) => { - // If a parameter value message, the message type will be "parameters" - // The type field is always located at a character offset of 13 and - // if it is "parameters" will be 10 characters long. - const type = message.substring(13, 23); - - return type === 'parameters'; - }, - /* istanbul ignore next */ - getBatchIdFromMessage: /* istanbul ignore next */ (message) => { - // Only dealing with "parameters" messages at this point. The call number - // identifies the parameter, and is used for batching. Will be located - // at a character offset of 36. Because it is of indeterminate length - // (we don't know the number) we have to do a sequential search forward - // from the 37th character for a terminating ",". - const callNumber = message.substring(36, message.indexOf(",", 37)); - - return callNumber; - } - }); - this.#socketWorker.setMaxBatchWait(maxBatchWait); - this.#socketWorker.setMaxBatchSize(maxBatchSize); - } addSupportedObjectTypes(types) { types.forEach(type => this.supportedObjectTypes[type] = type); @@ -230,46 +207,72 @@ export default class RealtimeProvider { } } - #processBatchQueue(batchQueue, call) { - let subscriptionDetails = this.subscriptionsByCall.get(call); - let telemetryData = []; + #processParameterUpdates(parameterValuesByCall) { + //If remote clock active, process its value before any telemetry values to ensure the bounds are always up to date. + if (this.remoteClockCallNumber !== undefined) { + const remoteClockValues = parameterValuesByCall.get(this.remoteClockCallNumber); + const subscriptionDetails = this.subscriptionsByCall.get(this.remoteClockCallNumber); - // possibly cancelled - if (!subscriptionDetails) { - return; - } + if (remoteClockValues !== undefined && remoteClockValues.length > 0) { + const allClockValues = []; - batchQueue.forEach((rawMessage) => { - const message = JSON.parse(rawMessage); - const values = message.data.values || []; - const parentName = subscriptionDetails.domainObject.name; + // We only care about the most recent clock tick message in the batch. + this.#convertMessageToDatumAndReportStaleness(remoteClockValues[remoteClockValues.length - 1], subscriptionDetails, allClockValues); - values.forEach(parameter => { - const datum = convertYamcsToOpenMctDatum(parameter, parentName); + if (allClockValues.length > 0) { + // A single parameter update message can include multiple values, again we only care about the most recent one. + subscriptionDetails.callback(allClockValues[allClockValues.length - 1]); + } - if (this.observingStaleness[subscriptionDetails.name] !== undefined) { - const status = STALENESS_STATUS_MAP[parameter.acquisitionStatus]; + // Delete so we don't process it twice. + parameterValuesByCall.delete(this.remoteClockCallNumber); + } + } - if (this.observingStaleness[subscriptionDetails.name].response.isStale !== status) { - const stalenesResponseObject = buildStalenessResponseObject( - status, - parameter[METADATA_TIME_KEY] - ); - this.observingStaleness[subscriptionDetails.name].response = stalenesResponseObject; - this.observingStaleness[subscriptionDetails.name].callback(stalenesResponseObject); - } - } + // Now process all non-clock parameter updates + for (const [call, parameterValues] of parameterValuesByCall.entries()) { + const allTelemetryData = []; + const subscriptionDetails = this.subscriptionsByCall.get(call); - addLimitInformation(parameter, datum); - telemetryData.push(datum); + // possibly cancelled + if (!subscriptionDetails) { + continue; + } + + parameterValues.forEach((parameterValue) => { + this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allTelemetryData); }); - }); - if (telemetryData.length > 0) { - subscriptionDetails.callback(telemetryData); + if (allTelemetryData.length > 0) { + subscriptionDetails.callback(allTelemetryData); + } } } + #convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allTelemetryData) { + const values = parameterValue.data.values || []; + const parentName = subscriptionDetails.domainObject.name; + values.forEach(parameter => { + const datum = convertYamcsToOpenMctDatum(parameter, parentName); + + if (this.observingStaleness[subscriptionDetails.name] !== undefined) { + const status = STALENESS_STATUS_MAP[parameter.acquisitionStatus]; + + if (this.observingStaleness[subscriptionDetails.name].response.isStale !== status) { + const stalenesResponseObject = buildStalenessResponseObject( + status, + parameter[METADATA_TIME_KEY] + ); + this.observingStaleness[subscriptionDetails.name].response = stalenesResponseObject; + this.observingStaleness[subscriptionDetails.name].callback(stalenesResponseObject); + } + } + + addLimitInformation(parameter, datum); + allTelemetryData.push(datum); + }); + } + connect() { if (this.connected) { return; @@ -285,84 +288,82 @@ export default class RealtimeProvider { }); this.#socketWorker.addEventListener('batch', (batchEvent) => { - const batch = batchEvent.detail; - - let remoteClockValue; - // If remote clock active, process its value before any telemetry values to ensure the bounds are always up to date. - if (this.remoteClockCallNumber !== undefined) { - remoteClockValue = batch[this.remoteClockCallNumber]; - if (remoteClockValue !== undefined) { - this.#processBatchQueue(batch[this.remoteClockCallNumber], this.remoteClockCallNumber); - - // Delete so we don't process it twice. - delete batch[this.remoteClockCallNumber]; - } - } - - Object.keys(batch).forEach((call) => { - this.#processBatchQueue(batch[call], call); - }); - }); - - this.#socketWorker.addEventListener('message', (messageEvent) => { - const message = JSON.parse(messageEvent.detail); - if (!this.isSupportedDataType(message.type)) { - return; - } + const newBatch = batchEvent.detail; + const parametersByCall = new Map(); + newBatch.forEach(messageString => { + //const message = JSON.parse(messageString); + const message = JSON.parse(messageString); + const call = message.call; + if (message.type === 'parameters') { + // First, group parameter updates by call + let arrayOfParametersForCall = parametersByCall.get(call); + + if (arrayOfParametersForCall === undefined) { + arrayOfParametersForCall = []; + parametersByCall.set(call, arrayOfParametersForCall); + } - const isReply = message.type === DATA_TYPES.DATA_TYPE_REPLY; - const call = message.call; - let subscriptionDetails; + arrayOfParametersForCall.push(message); + } else { + if (!this.isSupportedDataType(message.type)) { + return; + } - if (isReply) { - const id = message.data.replyTo; - subscriptionDetails = this.subscriptionsById[id]; - subscriptionDetails.call = call; - // Subsequent retrieval uses a string, so for performance reasons we use a string as a key. - this.subscriptionsByCall.set(call.toString(), subscriptionDetails); + const isReply = message.type === DATA_TYPES.DATA_TYPE_REPLY; + let subscriptionDetails; - const remoteClockIdentifier = this.#openmct.time.getClock()?.identifier; - const isRemoteClockActive = remoteClockIdentifier !== undefined; + if (isReply) { + const id = message.data.replyTo; + subscriptionDetails = this.subscriptionsById[id]; + subscriptionDetails.call = call; + // Subsequent retrieval uses a string, so for performance reasons we use a string as a key. + this.subscriptionsByCall.set(call, subscriptionDetails); - if (isRemoteClockActive && subscriptionDetails.domainObject.identifier.key === remoteClockIdentifier.key) { - this.remoteClockCallNumber = call.toString(); - } - } else { - subscriptionDetails = this.subscriptionsByCall.get(message.call.toString()); + const remoteClockIdentifier = this.#openmct.time.getClock()?.identifier; + const isRemoteClockActive = remoteClockIdentifier !== undefined; - // possibly cancelled - if (!subscriptionDetails) { - return; - } - - if (this.isCommandMessage(message)) { - const datum = commandToTelemetryDatum(message.data); - subscriptionDetails.callback(datum); - } else if (this.isEventMessage(message)) { - if (eventShouldBeFiltered(message.data, subscriptionDetails.options)) { - // ignore event + if (isRemoteClockActive && subscriptionDetails.domainObject.identifier.key === remoteClockIdentifier.key) { + this.remoteClockCallNumber = call; + } } else { - const datum = eventToTelemetryDatum(message.data); - subscriptionDetails.callback(datum); - } - } else if (this.isMdbChangesMessage(message)) { - if (!this.isParameterType(message)) { - return; - } - - const parameterName = message.data.parameterOverride.parameter; - if (this.observingLimitChanges[parameterName] !== undefined) { - const alarmRange = message.data.parameterOverride.defaultAlarm?.staticAlarmRange ?? []; - this.observingLimitChanges[parameterName].callback(getLimitFromAlarmRange(alarmRange)); + subscriptionDetails = this.subscriptionsByCall.get(message.call); + + // possibly cancelled + if (!subscriptionDetails) { + return; + } + + if (this.isCommandMessage(message)) { + const datum = commandToTelemetryDatum(message.data); + subscriptionDetails.callback(datum); + } else if (this.isEventMessage(message)) { + if (eventShouldBeFiltered(message.data, subscriptionDetails.options)) { + // ignore event + } else { + const datum = eventToTelemetryDatum(message.data); + subscriptionDetails.callback(datum); + } + } else if (this.isMdbChangesMessage(message)) { + if (!this.isParameterType(message)) { + return; + } + + const parameterName = message.data.parameterOverride.parameter; + if (this.observingLimitChanges[parameterName] !== undefined) { + const alarmRange = message.data.parameterOverride.defaultAlarm?.staticAlarmRange ?? []; + this.observingLimitChanges[parameterName].callback(getLimitFromAlarmRange(alarmRange)); + } + + if (subscriptionDetails.callback) { + subscriptionDetails.callback(message.data); + } + } else { + subscriptionDetails.callback(message.data); + } } - - if (subscriptionDetails.callback) { - subscriptionDetails.callback(message.data); - } - } else { - subscriptionDetails.callback(message.data); } - } + }); + this.#processParameterUpdates(parametersByCall); }); } From cb558a3ebc5f06d6d8ccb06dc3fdcb6fc1f1092c Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sun, 8 Sep 2024 16:07:03 -0700 Subject: [PATCH 02/17] Rename configuration variable --- example/index.js | 2 +- src/openmct-yamcs.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/example/index.js b/example/index.js index 59c9b4a4..17b20cf0 100644 --- a/example/index.js +++ b/example/index.js @@ -13,7 +13,7 @@ const config = { // memory usage of a string buffer in real-time. // String characters can be 8 or 16 bits in JavaScript, depending on the code page used. // Thus 500,000 characters requires up to 16MB of memory (1,000,000 * 16). - maxBatchSize: 1000000 + maxBufferSize: 1000000 }; const STATUS_STYLES = { NO_STATUS: { diff --git a/src/openmct-yamcs.js b/src/openmct-yamcs.js index c7c44956..4d0cd741 100644 --- a/src/openmct-yamcs.js +++ b/src/openmct-yamcs.js @@ -69,7 +69,7 @@ export default function install( configuration.yamcsInstance, configuration.yamcsProcessor, configuration.throttleRate, - configuration.maxBatchSize + configuration.maxBufferSize ); openmct.telemetry.addProvider(realtimeTelemetryProvider); realtimeTelemetryProvider.connect(); From ea7f6df379545b5a61319154ac2baa2878271a04 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 10 Sep 2024 20:38:01 -0700 Subject: [PATCH 03/17] Clean up --- package.json | 4 +- src/providers/realtime-provider.js | 20 ++-- tests/e2e/yamcs/quickstartTools.mjs | 1 + tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 122 ++++++++++++++++++++-- 4 files changed, 128 insertions(+), 19 deletions(-) diff --git a/package.json b/package.json index acf02948..a4087aa3 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,7 @@ "lint": "eslint src example", "lint:fix": "eslint src example --fix", "build:dist": "webpack --config ./.webpack/webpack.prod.mjs", - "build:example": "npm install openmct@unstable --no-save", + "build:example": "npm install nasa/openmct#improved-buffering --no-save", "build:example:master": "npm install nasa/openmct --no-save", "postbuild:example": "node check-optional-dependencies.mjs", "start": "npx webpack serve --config ./.webpack/webpack.dev.mjs", @@ -29,7 +29,7 @@ "posttest:getopensource": "npm install", "test:e2e:smoke": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium quickstartSmoke", "test:e2e:quickstart": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium tests/e2e/yamcs/", - "test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/", + "test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/ --debug", "test:e2e:watch": "npm test --workspace tests/e2e/opensource -- --ui --config=../playwright-quickstart.config.js", "wait-for-yamcs": "wait-on http-get://localhost:8090/ -v" }, diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index 28da2367..cc194ed0 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -135,7 +135,7 @@ export default class RealtimeProvider { if (subscriptionDetails) { this.sendUnsubscribeMessage(subscriptionDetails); - this.subscriptionsByCall.delete(subscriptionDetails.call.toString()); + this.subscriptionsByCall.delete(subscriptionDetails.call); delete this.subscriptionsById[id]; } }; @@ -160,12 +160,18 @@ export default class RealtimeProvider { this.sendUnsubscribeMessage(subscriptionDetails); if (this.subscriptionsById[id]) { - this.subscriptionsByCall.delete(this.subscriptionsById[id].call.toString()); + this.subscriptionsByCall.delete(this.subscriptionsById[id].call); delete this.subscriptionsById[id]; } }; } + getSubscriptionByObjectIdentifier(identifier) { + const objectKeystring = this.#openmct.objects.makeKeyString(identifier); + + return Object.values(this.subscriptionsById).find(subscription => this.#openmct.objects.areIdsEqual(subscription.domainObject.identifier, identifier)); + } + buildSubscriptionDetails(domainObject, callback, options) { let subscriptionId = this.lastSubscriptionId++; let subscriptionDetails = { @@ -201,7 +207,7 @@ export default class RealtimeProvider { }); if (correspondingSubscription !== undefined) { - this.remoteClockCallNumber = correspondingSubscription.call.toString(); + this.remoteClockCallNumber = correspondingSubscription.call; } else { delete this.remoteClockCallNumber; } @@ -217,11 +223,12 @@ export default class RealtimeProvider { const allClockValues = []; // We only care about the most recent clock tick message in the batch. - this.#convertMessageToDatumAndReportStaleness(remoteClockValues[remoteClockValues.length - 1], subscriptionDetails, allClockValues); + remoteClockValues.forEach((parameterValue) => { + this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allClockValues); + }); if (allClockValues.length > 0) { - // A single parameter update message can include multiple values, again we only care about the most recent one. - subscriptionDetails.callback(allClockValues[allClockValues.length - 1]); + subscriptionDetails.callback(allClockValues); } // Delete so we don't process it twice. @@ -291,7 +298,6 @@ export default class RealtimeProvider { const newBatch = batchEvent.detail; const parametersByCall = new Map(); newBatch.forEach(messageString => { - //const message = JSON.parse(messageString); const message = JSON.parse(messageString); const call = message.call; if (message.type === 'parameters') { diff --git a/tests/e2e/yamcs/quickstartTools.mjs b/tests/e2e/yamcs/quickstartTools.mjs index a755d81c..4c4dd080 100644 --- a/tests/e2e/yamcs/quickstartTools.mjs +++ b/tests/e2e/yamcs/quickstartTools.mjs @@ -27,6 +27,7 @@ async function disableLink(yamcsURL) { } async function enableLink(yamcsURL) { + console.log(`YAMCS URL: ${yamcsURL}`); const url = new URL(`api/links/myproject/udp-in:enable`, yamcsURL); await fetch(url.toString(), { method: 'POST' diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 4aad789a..20251766 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -119,7 +119,9 @@ test.describe('Realtime telemetry displays', () => { test('Correctly shows the latest values', async ({ page }) => { // Wait a reasonable amount of time for new telemetry to come in. - // There is nothing significant about the number chosen. + // There is nothing significant about the number chosen. It's + // long enough to ensure we have new telemetry, short enough that + // it doesn't significantly increase test time. const WAIT_FOR_MORE_TELEMETRY = 3000; const ladTable = await getLadTableByName(page, 'Test LAD Table'); @@ -238,6 +240,102 @@ test.describe('Realtime telemetry displays', () => { }); test('Open MCT does not drop telemetry while app is loading', async ({ page }) => { + //TODO: Upgrade this test to cycle through notifications, don't just use the last visible one. + const notification = page.getByRole('alert'); + const count = await notification.count(); + + if (count > 0) { + const text = await notification.innerText(); + expect(text).not.toBe('Telemetry dropped due to client rate limiting.'); + } else { + expect(notification).toHaveCount(0); + } + }); + + test.only('Open MCT does not drop telemetry when a burst of telemetry arrives', async ({ page }) => { + const PARAMETER_VALUES_COUNT = 60; + /** + * A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once. + * A burst of 60 messages will overwhelm a per-parameter telemetry buffer of 50, but will not overwhelm a larger shared buffer. + */ + + // Disable real playback. We are going to inject our own batch of messages + await disableLink(yamcsURL); + + const callNumber = await page.evaluate(async () => { + const openmct = window.openmct; + const objectIdentifier = { + namespace: 'taxonomy', + key: '~myproject~Battery1_Temp' + }; + const telemetryObject = await openmct.objects.get(objectIdentifier); + const yamcsRealtimeProvider = await openmct.telemetry.findSubscriptionProvider(telemetryObject); + + return yamcsRealtimeProvider.getSubscriptionByObjectIdentifier(objectIdentifier).call; + + }); + + // Need to get call number of `Battery1_Temp` so we can use it in injected telemetry data + websocketWorker.evaluate(({call, COUNT}) => { + const messageEvents = []; + + for (let messageCount = 0; messageCount < COUNT; messageCount++) { + const message = { + "type": "parameters", + "call": call, + "seq": messageCount, + "data": { + "@type": "/yamcs.protobuf.processing.SubscribeParametersData", + "values": [ + { + "rawValue": { + "type": "FLOAT", + "floatValue": 10.204108 + }, + "engValue": { + "type": "FLOAT", + "floatValue": 10.204108 + }, + "acquisitionTime": new Date(Date.now() + messageCount).toISOString(), + "generationTime": new Date(Date.now() + messageCount).toISOString(), + "acquisitionStatus": "ACQUIRED", + "numericId": 1 + } + ] + } + }; + const event = new Event('message'); + event.data = JSON.stringify(message); + messageEvents.push(event); + } + + messageEvents.forEach(event => { + self.currentWebSocket.dispatchEvent(event); + }); + + }, { + call: callNumber, + COUNT: PARAMETER_VALUES_COUNT + }); + + // Subscribe to Battery1_Temp so we can confirm that the injected parameter values were received, + const telemetryValues = await page.evaluate(async () => { + const openmct = window.openmct; + const objectIdentifier = { + namespace: 'taxonomy', + key: '~myproject~Battery1_Temp' + }; + const telemetryObject = await openmct.objects.get(objectIdentifier); + + return new Promise((resolveWithTelemetry) => { + openmct.telemetry.subscribe(telemetryObject, (telemetry) => { + resolveWithTelemetry(telemetry); + }, {strategy: 'batch'}); + }); + }); + + expect(telemetryValues.length).toBe(PARAMETER_VALUES_COUNT); + const notification = page.getByRole('alert'); const count = await notification.count(); @@ -257,11 +355,14 @@ test.describe('Realtime telemetry displays', () => { // 2. Block the UI with a loop await page.evaluate(() => { return new Promise((resolveBlockingLoop) => { - //5s x 10Hz data = 50 telemetry values which should easily overrun the buffer length of 20. - let start = Date.now(); + const start = Date.now(); let now = Date.now(); - // Block the UI thread for 5s - while (now - start < 5000) { + // BUFFER_LENGTH / (AVG_MESSAGE_LENGTH_CHARS * MSGS_PER_SECOND) + // 1000000 / (500 * 10hz * 40 subscriptions) = ~ 5s (6 to be safe) + const durationToBlockFor = 6000; + + // Block the UI thread for 6s per above calculation + while (now - start < durationToBlockFor) { now = Date.now(); } @@ -283,8 +384,8 @@ test.describe('Realtime telemetry displays', () => { return new Promise((resolveBlockingLoop) => { let start = Date.now(); let now = Date.now(); - // Block the UI thread for 5s - while (now - start < 5000) { + // Block the UI thread for 6s + while (now - start < 6000) { now = Date.now(); } @@ -307,7 +408,7 @@ test.describe('Realtime telemetry displays', () => { test('Open MCT accurately batches telemetry when requested', async ({ page }) => { - // 1. Subscribe to batched telemetry, + // 1. Subscribe to batched telemetry,e const telemetryValues = await page.evaluate(async () => { const openmct = window.openmct; const telemetryObject = await openmct.objects.get({ @@ -342,7 +443,8 @@ test.describe('Realtime telemetry displays', () => { }); const formattedParameterArchiveTelemetry = toOpenMctTelemetryFormat(parameterArchiveTelemetry); sortOpenMctTelemetryAscending(formattedParameterArchiveTelemetry); - + console.log(`RT batch: ${JSON.stringify(telemetryValues)}`); + console.log(`Archive: ${JSON.stringify(formattedParameterArchiveTelemetry)}`); telemetryValues.forEach((telemetry, index) => { expect(telemetry.value).toBe(formattedParameterArchiveTelemetry[index].value); expect(telemetry.timestamp).toBe(formattedParameterArchiveTelemetry[index].timestamp); @@ -414,7 +516,7 @@ test.describe('Realtime telemetry displays', () => { } /** - * @param {import('playwright').Page} page + * @param {import('playwright').Page} page * @returns {Promise<{parameterNameText: string, parameterValueText: string}[]>} */ async function getParameterValuesFromAllGauges(page) { From 4ba66817a4ef7021dd2fd57ff54fb208973dc8ff Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 10 Sep 2024 21:38:54 -0700 Subject: [PATCH 04/17] Removed commented code --- src/providers/realtime-provider.js | 1 - tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index cc194ed0..2be5200e 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -222,7 +222,6 @@ export default class RealtimeProvider { if (remoteClockValues !== undefined && remoteClockValues.length > 0) { const allClockValues = []; - // We only care about the most recent clock tick message in the batch. remoteClockValues.forEach((parameterValue) => { this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allClockValues); }); diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 20251766..d23297dc 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -240,7 +240,6 @@ test.describe('Realtime telemetry displays', () => { }); test('Open MCT does not drop telemetry while app is loading', async ({ page }) => { - //TODO: Upgrade this test to cycle through notifications, don't just use the last visible one. const notification = page.getByRole('alert'); const count = await notification.count(); From e3884e7c4dc81506478fe10268aade53157cee29 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 10 Sep 2024 21:39:30 -0700 Subject: [PATCH 05/17] Removed debugging code --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a4087aa3..96b31112 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,7 @@ "posttest:getopensource": "npm install", "test:e2e:smoke": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium quickstartSmoke", "test:e2e:quickstart": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium tests/e2e/yamcs/", - "test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/ --debug", + "test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/", "test:e2e:watch": "npm test --workspace tests/e2e/opensource -- --ui --config=../playwright-quickstart.config.js", "wait-for-yamcs": "wait-on http-get://localhost:8090/ -v" }, From 6b7d62467d7ee399b26f9cf4bd30550c4c0cb74e Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Wed, 11 Sep 2024 11:57:47 -0700 Subject: [PATCH 06/17] Fixed linting errors --- example/index.js | 3 ++- src/providers/realtime-provider.js | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/example/index.js b/example/index.js index 17b20cf0..3e954e4a 100644 --- a/example/index.js +++ b/example/index.js @@ -62,7 +62,8 @@ const openmct = window.openmct; document.addEventListener("DOMContentLoaded", function () { openmct.start(); }); - openmct.install(openmct.plugins.RemoteClock({namespace: 'taxonomy', + openmct.install(openmct.plugins.RemoteClock({ + namespace: 'taxonomy', key: '~myproject~A' })); openmct.install( diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index 2be5200e..a08c500d 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -167,8 +167,6 @@ export default class RealtimeProvider { } getSubscriptionByObjectIdentifier(identifier) { - const objectKeystring = this.#openmct.objects.makeKeyString(identifier); - return Object.values(this.subscriptionsById).find(subscription => this.#openmct.objects.areIdsEqual(subscription.domainObject.identifier, identifier)); } From edeb422c4c6250ff87297ffa5a7de05dddecfcc5 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Wed, 11 Sep 2024 12:21:40 -0700 Subject: [PATCH 07/17] Remove debug logging --- tests/e2e/yamcs/quickstartTools.mjs | 1 - tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 2 -- 2 files changed, 3 deletions(-) diff --git a/tests/e2e/yamcs/quickstartTools.mjs b/tests/e2e/yamcs/quickstartTools.mjs index 4c4dd080..a755d81c 100644 --- a/tests/e2e/yamcs/quickstartTools.mjs +++ b/tests/e2e/yamcs/quickstartTools.mjs @@ -27,7 +27,6 @@ async function disableLink(yamcsURL) { } async function enableLink(yamcsURL) { - console.log(`YAMCS URL: ${yamcsURL}`); const url = new URL(`api/links/myproject/udp-in:enable`, yamcsURL); await fetch(url.toString(), { method: 'POST' diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index d23297dc..2acb0279 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -442,8 +442,6 @@ test.describe('Realtime telemetry displays', () => { }); const formattedParameterArchiveTelemetry = toOpenMctTelemetryFormat(parameterArchiveTelemetry); sortOpenMctTelemetryAscending(formattedParameterArchiveTelemetry); - console.log(`RT batch: ${JSON.stringify(telemetryValues)}`); - console.log(`Archive: ${JSON.stringify(formattedParameterArchiveTelemetry)}`); telemetryValues.forEach((telemetry, index) => { expect(telemetry.value).toBe(formattedParameterArchiveTelemetry[index].value); expect(telemetry.timestamp).toBe(formattedParameterArchiveTelemetry[index].timestamp); From f359ce9dfd54439ae92dd6da3b7416d2bb7e854b Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 21 Sep 2024 16:41:35 -0700 Subject: [PATCH 08/17] Improved comments --- tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 30 +++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 2acb0279..4739ac26 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -251,7 +251,7 @@ test.describe('Realtime telemetry displays', () => { } }); - test.only('Open MCT does not drop telemetry when a burst of telemetry arrives', async ({ page }) => { + test.only('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of the buffer', async ({ page }) => { const PARAMETER_VALUES_COUNT = 60; /** * A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once. @@ -261,7 +261,15 @@ test.describe('Realtime telemetry displays', () => { // Disable real playback. We are going to inject our own batch of messages await disableLink(yamcsURL); - const callNumber = await page.evaluate(async () => { + /** + * Yamcs tracks subscriptions by "call number". The call number is assigned by Yamcs, + * so we don't know it ahead of time. We have to retrieve it at runtime after the subscription + * has been established. + * + * We need to know the call number, because it's how the receiver (Open MCT) ties a parameter + * value that is received over a WebSocket back to the correct subscription. + */ + const batteryTempParameterCallNumber = await page.evaluate(async () => { const openmct = window.openmct; const objectIdentifier = { namespace: 'taxonomy', @@ -274,13 +282,15 @@ test.describe('Realtime telemetry displays', () => { }); - // Need to get call number of `Battery1_Temp` so we can use it in injected telemetry data websocketWorker.evaluate(({call, COUNT}) => { const messageEvents = []; - + /** + * Inject a burst of 60 messages. + */ for (let messageCount = 0; messageCount < COUNT; messageCount++) { const message = { "type": "parameters", + //This is where we use the call number retrieved previously "call": call, "seq": messageCount, "data": { @@ -303,17 +313,27 @@ test.describe('Realtime telemetry displays', () => { ] } }; + /** + * We are building an array of Event objects of type 'message'. Dispatching an event of this + * type on a WebSocket will cause all listeners subscribed to 'message' events to receive it. + * The receiving code will not know the difference between an Event that is dispatched from + * code vs. one that caused by the arrival of data over the wire. + * @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event + */ const event = new Event('message'); event.data = JSON.stringify(message); messageEvents.push(event); } + /** + * Dispatch the 60 WebSocket message events we just created + */ messageEvents.forEach(event => { self.currentWebSocket.dispatchEvent(event); }); }, { - call: callNumber, + call: batteryTempParameterCallNumber, COUNT: PARAMETER_VALUES_COUNT }); From 2cf6ddb2f6e7d344012d8c8a8d3a74e5e5edcab1 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 21 Sep 2024 17:18:13 -0700 Subject: [PATCH 09/17] Remove from test. Oops --- tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 4739ac26..d661f959 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -251,7 +251,7 @@ test.describe('Realtime telemetry displays', () => { } }); - test.only('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of the buffer', async ({ page }) => { + test('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of the buffer', async ({ page }) => { const PARAMETER_VALUES_COUNT = 60; /** * A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once. From 1d8ff21e68fa9fead04be133b8de67f1a25a071a Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Fri, 27 Sep 2024 14:19:41 -0700 Subject: [PATCH 10/17] Fix test issues --- src/providers/realtime-provider.js | 6 ++++++ tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index a08c500d..a5f6689e 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -318,6 +318,12 @@ export default class RealtimeProvider { if (isReply) { const id = message.data.replyTo; subscriptionDetails = this.subscriptionsById[id]; + + // Susbcriptions can be cancelled before we even get to this stage during tests due to rapid navigation. + if (!subscriptionDetails) { + return; + } + subscriptionDetails.call = call; // Subsequent retrieval uses a string, so for performance reasons we use a string as a key. this.subscriptionsByCall.set(call, subscriptionDetails); diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index d661f959..1240bcdc 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -37,7 +37,7 @@ const realTimeDisplayPath = fileURLToPath( // Wait 1s from when telemetry is received before sampling values in the UI. This is 1s because by default // Open MCT is configured to release batches of telemetry every 1s. So depending on when it is sampled it // may take up to 1s for telemetry to propagate to the UI from when it is received. -const TELEMETRY_PROPAGATION_TIME = 1000; +const TELEMETRY_PROPAGATION_TIME = 2000; const THIRTY_MINUTES = 30 * 60 * 1000; test.describe('Realtime telemetry displays', () => { @@ -251,7 +251,7 @@ test.describe('Realtime telemetry displays', () => { } }); - test('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of the buffer', async ({ page }) => { + test('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of 60 messages', async ({ page }) => { const PARAMETER_VALUES_COUNT = 60; /** * A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once. @@ -352,8 +352,10 @@ test.describe('Realtime telemetry displays', () => { }, {strategy: 'batch'}); }); }); - - expect(telemetryValues.length).toBe(PARAMETER_VALUES_COUNT); + // To avoid test flake use >= instead of =. Because yamcs is also flowing data immediately prior to this test there + // can be some real data still in the buffer or in-transit. It's inherently stochastic because the Yamcs instance is not + // isolated between tests, but it doesn't invalidate the test in this case. + expect(telemetryValues.length).toBeGreaterThanOrEqual(PARAMETER_VALUES_COUNT); const notification = page.getByRole('alert'); const count = await notification.count(); From 3b47dab98592bb3719ddc7c790d3a60a5ec78251 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Fri, 27 Sep 2024 14:42:04 -0700 Subject: [PATCH 11/17] Fix configuration --- example/index.js | 13 ------------- package.json | 2 +- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/example/index.js b/example/index.js index 0ef13dcd..8d03aa2c 100644 --- a/example/index.js +++ b/example/index.js @@ -62,22 +62,9 @@ const openmct = window.openmct; document.addEventListener("DOMContentLoaded", function () { openmct.start(); }); - openmct.install(openmct.plugins.RemoteClock({ - namespace: 'taxonomy', - key: '~myproject~A' - })); openmct.install( openmct.plugins.Conductor({ menuOptions: [ - { - name: "Remote", - timeSystem: 'utc', - clock: 'remote-clock', - clockOffsets: { - start: -FIFTEEN_MINUTES, - end: FIFTEEN_SECONDS - } - }, { name: "Realtime", timeSystem: "utc", diff --git a/package.json b/package.json index 51711105..d15ea753 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ "lint:fix": "eslint src example --fix", "build:dist": "webpack --config ./.webpack/webpack.prod.mjs", "build:example": "npm install nasa/openmct#improved-buffering --no-save", - "build:example:master": "npm install nasa/openmct --no-save", + "build:example:master": "npm install nasa/openmct#improved-buffering --no-save", "build:example:currentbranch": "npm install nasa/openmct#$(git rev-parse --abbrev-ref HEAD) --no-save --verbose", "postbuild:example": "node check-optional-dependencies.mjs", "start": "npx webpack serve --config ./.webpack/webpack.dev.mjs", From c0408e3303a9e13d6db62a924c0b925e0b00d293 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Fri, 27 Sep 2024 14:49:42 -0700 Subject: [PATCH 12/17] Fix linting issues --- example/index.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/example/index.js b/example/index.js index 8d03aa2c..9350677e 100644 --- a/example/index.js +++ b/example/index.js @@ -46,9 +46,7 @@ const openmct = window.openmct; (() => { const ONE_SECOND = 1000; - const FIFTEEN_SECONDS = ONE_SECOND * 15; const ONE_MINUTE = ONE_SECOND * 60; - const FIFTEEN_MINUTES = ONE_MINUTE * 15; const THIRTY_MINUTES = ONE_MINUTE * 30; openmct.setAssetPath("/node_modules/openmct/dist"); From ab96697508a144ff3406d492ae3d0cf4f342ce81 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 28 Sep 2024 17:56:58 -0700 Subject: [PATCH 13/17] Don't rely on negative logic for defining throttling rules --- src/providers/realtime-provider.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index a5f6689e..6820740e 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -42,6 +42,9 @@ import { eventToTelemetryDatum, eventShouldBeFiltered } from './events.js'; const ONE_SECOND = 1000; const ONE_MILLION_CHARACTERS = 1000000; +//Everything except parameter messages are housekeeping and if they're dropped bad things can happen. +const PARAMETER_MESSAGES = '^{[\\s]*"type":\\s"parameters'; + export default class RealtimeProvider { #socketWorker = null; #openmct; @@ -62,6 +65,7 @@ export default class RealtimeProvider { this.subscriptionsByCall = new Map(); this.subscriptionsById = {}; this.#socketWorker = new openmct.telemetry.BatchingWebSocket(openmct); + this.#socketWorker.setThrottleMessagePattern(PARAMETER_MESSAGES); this.#openmct = openmct; this.#socketWorker.setThrottleRate(throttleRate); this.#socketWorker.setMaxBufferSize(maxBufferSize); From e379e3c07e9fcd58a397b76f978c8ed332735f74 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 28 Sep 2024 17:57:10 -0700 Subject: [PATCH 14/17] Add test for edge-case relating to throttling --- tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 211 +++++++--------------- 1 file changed, 63 insertions(+), 148 deletions(-) diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 1240bcdc..5fd53a9b 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -52,19 +52,22 @@ test.describe('Realtime telemetry displays', () => { }); // Go to baseURL - await page.goto('./', { waitUntil: 'domcontentloaded' }); + await page.goto('./', { waitUntil: 'networkidle' }); await page.evaluate((thirtyMinutes) => { - const openmct = window.openmct; + return new Promise((resolve) => { + const openmct = window.openmct; - openmct.install(openmct.plugins.RemoteClock({ - namespace: "taxonomy", - key: "~myproject~Battery1_Temp" - })); + openmct.install(openmct.plugins.RemoteClock({ + namespace: "taxonomy", + key: "~myproject~Battery1_Temp" + })); - openmct.time.setClock('remote-clock'); - openmct.time.setClockOffsets({ - start: -thirtyMinutes, - end: 0 + openmct.time.setClock('remote-clock'); + openmct.time.setClockOffsets({ + start: -thirtyMinutes, + end: 15000 + }); + setTimeout(resolve, 2000); }); }, THIRTY_MINUTES); yamcsURL = new URL('/yamcs-proxy/', page.url()).toString(); @@ -250,151 +253,58 @@ test.describe('Realtime telemetry displays', () => { expect(notification).toHaveCount(0); } }); - - test('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of 60 messages', async ({ page }) => { - const PARAMETER_VALUES_COUNT = 60; - /** - * A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once. - * A burst of 60 messages will overwhelm a per-parameter telemetry buffer of 50, but will not overwhelm a larger shared buffer. - */ - - // Disable real playback. We are going to inject our own batch of messages - await disableLink(yamcsURL); - - /** - * Yamcs tracks subscriptions by "call number". The call number is assigned by Yamcs, - * so we don't know it ahead of time. We have to retrieve it at runtime after the subscription - * has been established. - * - * We need to know the call number, because it's how the receiver (Open MCT) ties a parameter - * value that is received over a WebSocket back to the correct subscription. - */ - const batteryTempParameterCallNumber = await page.evaluate(async () => { - const openmct = window.openmct; - const objectIdentifier = { - namespace: 'taxonomy', - key: '~myproject~Battery1_Temp' - }; - const telemetryObject = await openmct.objects.get(objectIdentifier); - const yamcsRealtimeProvider = await openmct.telemetry.findSubscriptionProvider(telemetryObject); - - return yamcsRealtimeProvider.getSubscriptionByObjectIdentifier(objectIdentifier).call; - - }); - - websocketWorker.evaluate(({call, COUNT}) => { - const messageEvents = []; - /** - * Inject a burst of 60 messages. - */ - for (let messageCount = 0; messageCount < COUNT; messageCount++) { - const message = { - "type": "parameters", - //This is where we use the call number retrieved previously - "call": call, - "seq": messageCount, - "data": { - "@type": "/yamcs.protobuf.processing.SubscribeParametersData", - "values": [ - { - "rawValue": { - "type": "FLOAT", - "floatValue": 10.204108 - }, - "engValue": { - "type": "FLOAT", - "floatValue": 10.204108 - }, - "acquisitionTime": new Date(Date.now() + messageCount).toISOString(), - "generationTime": new Date(Date.now() + messageCount).toISOString(), - "acquisitionStatus": "ACQUIRED", - "numericId": 1 - } - ] - } - }; - /** - * We are building an array of Event objects of type 'message'. Dispatching an event of this - * type on a WebSocket will cause all listeners subscribed to 'message' events to receive it. - * The receiving code will not know the difference between an Event that is dispatched from - * code vs. one that caused by the arrival of data over the wire. - * @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event - */ - const event = new Event('message'); - event.data = JSON.stringify(message); - messageEvents.push(event); - } - - /** - * Dispatch the 60 WebSocket message events we just created - */ - messageEvents.forEach(event => { - self.currentWebSocket.dispatchEvent(event); - }); - - }, { - call: batteryTempParameterCallNumber, - COUNT: PARAMETER_VALUES_COUNT - }); - - // Subscribe to Battery1_Temp so we can confirm that the injected parameter values were received, - const telemetryValues = await page.evaluate(async () => { - const openmct = window.openmct; - const objectIdentifier = { - namespace: 'taxonomy', - key: '~myproject~Battery1_Temp' - }; - const telemetryObject = await openmct.objects.get(objectIdentifier); - - return new Promise((resolveWithTelemetry) => { - openmct.telemetry.subscribe(telemetryObject, (telemetry) => { - resolveWithTelemetry(telemetry); - }, {strategy: 'batch'}); - }); - }); - // To avoid test flake use >= instead of =. Because yamcs is also flowing data immediately prior to this test there - // can be some real data still in the buffer or in-transit. It's inherently stochastic because the Yamcs instance is not - // isolated between tests, but it doesn't invalidate the test in this case. - expect(telemetryValues.length).toBeGreaterThanOrEqual(PARAMETER_VALUES_COUNT); - - const notification = page.getByRole('alert'); - const count = await notification.count(); - - if (count > 0) { - const text = await notification.innerText(); - expect(text).not.toBe('Telemetry dropped due to client rate limiting.'); - } else { - expect(notification).toHaveCount(0); - } - }); - - test('Open MCT does drop telemetry when the UI is under load', async ({ page }) => { - // 1. Make sure the display is done loading, and populated with values (ie. we are in a steady state) - const ladTable = await getLadTableByName(page, 'Test LAD Table'); - await getParameterValuesFromLadTable(ladTable); - - // 2. Block the UI with a loop + /** + * This tests for an edge-case found during testing where throttling occurs during subscription handshaking with the server. + * In this scenario, subscribers never receive telemetry because the subscription was never properly initialized. + * This test confirms that after blocking the UI and inducing throttling, that all subscribed telemetry objects received telemetry. + */ + test('When the UI is blocked during initialization, does not drop subscription housekeeping messages', async ({ page }) => { + // 1. Block the UI await page.evaluate(() => { return new Promise((resolveBlockingLoop) => { - const start = Date.now(); + let start = Date.now(); let now = Date.now(); - // BUFFER_LENGTH / (AVG_MESSAGE_LENGTH_CHARS * MSGS_PER_SECOND) - // 1000000 / (500 * 10hz * 40 subscriptions) = ~ 5s (6 to be safe) - const durationToBlockFor = 6000; - - // Block the UI thread for 6s per above calculation - while (now - start < durationToBlockFor) { + // Block the UI thread for 6s + while (now - start < 10000) { now = Date.now(); } resolveBlockingLoop(); }); }); - // Check for telemetry dropped notification + + //Confirm that throttling occurred const notification = page.getByRole('alert'); - expect(notification).toHaveCount(1); const text = await notification.innerText(); expect(text).toBe('Telemetry dropped due to client rate limiting.'); + + //Confirm that all subscribed telemetry points receive telemetry. This tests that subscriptions were established successfully and + //tests for a failure mode where housekeeping telemetry was being dropped if the UI was blocked during initialization of telemetry subscriptions + const parametersToSubscribeTo = Object.values(namesToParametersMap).map(parameter => parameter.replaceAll('/', '~')); + const subscriptionsThatTelemetry = await page.evaluate(async (parameters) => { + const openmct = window.openmct; + const telemetryObjects = await Promise.all( + Object.values(parameters).map( + (parameterId) => openmct.objects.get( + { + namespace: 'taxonomy', + key: parameterId + } + ) + )); + const subscriptionsAllReturned = await Promise.all(telemetryObjects.map((telemetryObject) => { + return new Promise(resolve => { + const unsubscribe = openmct.telemetry.subscribe(telemetryObject, () => { + unsubscribe(); + resolve(true); + }); + }); + })); + + return subscriptionsAllReturned; + }, parametersToSubscribeTo); + + expect(subscriptionsThatTelemetry.length).toBe(parametersToSubscribeTo.length); }); test('Open MCT shows the latest telemetry after UI is temporarily blocked', async ({ page }) => { @@ -405,20 +315,25 @@ test.describe('Realtime telemetry displays', () => { return new Promise((resolveBlockingLoop) => { let start = Date.now(); let now = Date.now(); - // Block the UI thread for 6s - while (now - start < 6000) { + // Block the UI thread for 10s + while (now - start < 10000) { now = Date.now(); } - resolveBlockingLoop(); + requestIdleCallback(resolveBlockingLoop); }); }); + //Confirm that throttling occurred + const notification = page.getByRole('alert'); + const text = await notification.innerText(); + expect(text).toBe('Telemetry dropped due to client rate limiting.'); + // Disable playback await disableLink(yamcsURL); - // Wait 1 second for values to propagate to client and render on screen. - await page.waitForTimeout(TELEMETRY_PROPAGATION_TIME); + // Wait for values to propagate to client and render on screen. + await page.waitForTimeout(5000); const latestValueObjects = await latestParameterValues(Object.values(namesToParametersMap), yamcsURL); const parameterNamesToLatestValues = toParameterNameToValueMap(latestValueObjects); From c46c6582bc916d179af6fc890646cc4514e8dd38 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 28 Sep 2024 18:09:08 -0700 Subject: [PATCH 15/17] Revert changes to timing --- tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 5fd53a9b..c7a1cc05 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -37,7 +37,7 @@ const realTimeDisplayPath = fileURLToPath( // Wait 1s from when telemetry is received before sampling values in the UI. This is 1s because by default // Open MCT is configured to release batches of telemetry every 1s. So depending on when it is sampled it // may take up to 1s for telemetry to propagate to the UI from when it is received. -const TELEMETRY_PROPAGATION_TIME = 2000; +const TELEMETRY_PROPAGATION_TIME = 1000; const THIRTY_MINUTES = 30 * 60 * 1000; test.describe('Realtime telemetry displays', () => { @@ -333,7 +333,7 @@ test.describe('Realtime telemetry displays', () => { await disableLink(yamcsURL); // Wait for values to propagate to client and render on screen. - await page.waitForTimeout(5000); + await page.waitForTimeout(TELEMETRY_PROPAGATION_TIME); const latestValueObjects = await latestParameterValues(Object.values(namesToParametersMap), yamcsURL); const parameterNamesToLatestValues = toParameterNameToValueMap(latestValueObjects); From 86a3846271af1e7dd55602ca19ed41fa83bb0571 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 30 Sep 2024 14:39:09 -0700 Subject: [PATCH 16/17] Revert change to package.json --- package.json | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index d15ea753..acf02948 100644 --- a/package.json +++ b/package.json @@ -19,9 +19,8 @@ "lint": "eslint src example", "lint:fix": "eslint src example --fix", "build:dist": "webpack --config ./.webpack/webpack.prod.mjs", - "build:example": "npm install nasa/openmct#improved-buffering --no-save", - "build:example:master": "npm install nasa/openmct#improved-buffering --no-save", - "build:example:currentbranch": "npm install nasa/openmct#$(git rev-parse --abbrev-ref HEAD) --no-save --verbose", + "build:example": "npm install openmct@unstable --no-save", + "build:example:master": "npm install nasa/openmct --no-save", "postbuild:example": "node check-optional-dependencies.mjs", "start": "npx webpack serve --config ./.webpack/webpack.dev.mjs", "start:coverage": "npx webpack serve --config ./.webpack/webpack.coverage.mjs", From cbcd4914a31beae05c66cfaacccae2e36c3b21bf Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 30 Sep 2024 14:52:16 -0700 Subject: [PATCH 17/17] Get changes to package.json from master --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index acf02948..03f0da86 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "build:dist": "webpack --config ./.webpack/webpack.prod.mjs", "build:example": "npm install openmct@unstable --no-save", "build:example:master": "npm install nasa/openmct --no-save", + "build:example:currentbranch": "npm install nasa/openmct#$(git rev-parse --abbrev-ref HEAD) --no-save --verbose", "postbuild:example": "node check-optional-dependencies.mjs", "start": "npx webpack serve --config ./.webpack/webpack.dev.mjs", "start:coverage": "npx webpack serve --config ./.webpack/webpack.coverage.mjs",