From ea7f6df379545b5a61319154ac2baa2878271a04 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 10 Sep 2024 20:38:01 -0700 Subject: [PATCH] Clean up --- package.json | 4 +- src/providers/realtime-provider.js | 20 ++-- tests/e2e/yamcs/quickstartTools.mjs | 1 + tests/e2e/yamcs/realtimeData.e2e.spec.mjs | 122 ++++++++++++++++++++-- 4 files changed, 128 insertions(+), 19 deletions(-) diff --git a/package.json b/package.json index acf02948..a4087aa3 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,7 @@ "lint": "eslint src example", "lint:fix": "eslint src example --fix", "build:dist": "webpack --config ./.webpack/webpack.prod.mjs", - "build:example": "npm install openmct@unstable --no-save", + "build:example": "npm install nasa/openmct#improved-buffering --no-save", "build:example:master": "npm install nasa/openmct --no-save", "postbuild:example": "node check-optional-dependencies.mjs", "start": "npx webpack serve --config ./.webpack/webpack.dev.mjs", @@ -29,7 +29,7 @@ "posttest:getopensource": "npm install", "test:e2e:smoke": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium quickstartSmoke", "test:e2e:quickstart": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium tests/e2e/yamcs/", - "test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/", + "test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/ --debug", "test:e2e:watch": "npm test --workspace tests/e2e/opensource -- --ui --config=../playwright-quickstart.config.js", "wait-for-yamcs": "wait-on http-get://localhost:8090/ -v" }, diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index 28da2367..cc194ed0 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -135,7 +135,7 @@ export default class RealtimeProvider { if (subscriptionDetails) { this.sendUnsubscribeMessage(subscriptionDetails); - this.subscriptionsByCall.delete(subscriptionDetails.call.toString()); + this.subscriptionsByCall.delete(subscriptionDetails.call); delete this.subscriptionsById[id]; } }; @@ -160,12 +160,18 @@ export default class RealtimeProvider { this.sendUnsubscribeMessage(subscriptionDetails); if (this.subscriptionsById[id]) { - this.subscriptionsByCall.delete(this.subscriptionsById[id].call.toString()); + this.subscriptionsByCall.delete(this.subscriptionsById[id].call); delete this.subscriptionsById[id]; } }; } + getSubscriptionByObjectIdentifier(identifier) { + const objectKeystring = this.#openmct.objects.makeKeyString(identifier); + + return Object.values(this.subscriptionsById).find(subscription => this.#openmct.objects.areIdsEqual(subscription.domainObject.identifier, identifier)); + } + buildSubscriptionDetails(domainObject, callback, options) { let subscriptionId = this.lastSubscriptionId++; let subscriptionDetails = { @@ -201,7 +207,7 @@ export default class RealtimeProvider { }); if (correspondingSubscription !== undefined) { - this.remoteClockCallNumber = correspondingSubscription.call.toString(); + this.remoteClockCallNumber = correspondingSubscription.call; } else { delete this.remoteClockCallNumber; } @@ -217,11 +223,12 @@ export default class RealtimeProvider { const allClockValues = []; // We only care about the most recent clock tick message in the batch. - this.#convertMessageToDatumAndReportStaleness(remoteClockValues[remoteClockValues.length - 1], subscriptionDetails, allClockValues); + remoteClockValues.forEach((parameterValue) => { + this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allClockValues); + }); if (allClockValues.length > 0) { - // A single parameter update message can include multiple values, again we only care about the most recent one. - subscriptionDetails.callback(allClockValues[allClockValues.length - 1]); + subscriptionDetails.callback(allClockValues); } // Delete so we don't process it twice. @@ -291,7 +298,6 @@ export default class RealtimeProvider { const newBatch = batchEvent.detail; const parametersByCall = new Map(); newBatch.forEach(messageString => { - //const message = JSON.parse(messageString); const message = JSON.parse(messageString); const call = message.call; if (message.type === 'parameters') { diff --git a/tests/e2e/yamcs/quickstartTools.mjs b/tests/e2e/yamcs/quickstartTools.mjs index a755d81c..4c4dd080 100644 --- a/tests/e2e/yamcs/quickstartTools.mjs +++ b/tests/e2e/yamcs/quickstartTools.mjs @@ -27,6 +27,7 @@ async function disableLink(yamcsURL) { } async function enableLink(yamcsURL) { + console.log(`YAMCS URL: ${yamcsURL}`); const url = new URL(`api/links/myproject/udp-in:enable`, yamcsURL); await fetch(url.toString(), { method: 'POST' diff --git a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs index 4aad789a..20251766 100644 --- a/tests/e2e/yamcs/realtimeData.e2e.spec.mjs +++ b/tests/e2e/yamcs/realtimeData.e2e.spec.mjs @@ -119,7 +119,9 @@ test.describe('Realtime telemetry displays', () => { test('Correctly shows the latest values', async ({ page }) => { // Wait a reasonable amount of time for new telemetry to come in. - // There is nothing significant about the number chosen. + // There is nothing significant about the number chosen. It's + // long enough to ensure we have new telemetry, short enough that + // it doesn't significantly increase test time. const WAIT_FOR_MORE_TELEMETRY = 3000; const ladTable = await getLadTableByName(page, 'Test LAD Table'); @@ -238,6 +240,102 @@ test.describe('Realtime telemetry displays', () => { }); test('Open MCT does not drop telemetry while app is loading', async ({ page }) => { + //TODO: Upgrade this test to cycle through notifications, don't just use the last visible one. + const notification = page.getByRole('alert'); + const count = await notification.count(); + + if (count > 0) { + const text = await notification.innerText(); + expect(text).not.toBe('Telemetry dropped due to client rate limiting.'); + } else { + expect(notification).toHaveCount(0); + } + }); + + test.only('Open MCT does not drop telemetry when a burst of telemetry arrives', async ({ page }) => { + const PARAMETER_VALUES_COUNT = 60; + /** + * A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once. + * A burst of 60 messages will overwhelm a per-parameter telemetry buffer of 50, but will not overwhelm a larger shared buffer. + */ + + // Disable real playback. We are going to inject our own batch of messages + await disableLink(yamcsURL); + + const callNumber = await page.evaluate(async () => { + const openmct = window.openmct; + const objectIdentifier = { + namespace: 'taxonomy', + key: '~myproject~Battery1_Temp' + }; + const telemetryObject = await openmct.objects.get(objectIdentifier); + const yamcsRealtimeProvider = await openmct.telemetry.findSubscriptionProvider(telemetryObject); + + return yamcsRealtimeProvider.getSubscriptionByObjectIdentifier(objectIdentifier).call; + + }); + + // Need to get call number of `Battery1_Temp` so we can use it in injected telemetry data + websocketWorker.evaluate(({call, COUNT}) => { + const messageEvents = []; + + for (let messageCount = 0; messageCount < COUNT; messageCount++) { + const message = { + "type": "parameters", + "call": call, + "seq": messageCount, + "data": { + "@type": "/yamcs.protobuf.processing.SubscribeParametersData", + "values": [ + { + "rawValue": { + "type": "FLOAT", + "floatValue": 10.204108 + }, + "engValue": { + "type": "FLOAT", + "floatValue": 10.204108 + }, + "acquisitionTime": new Date(Date.now() + messageCount).toISOString(), + "generationTime": new Date(Date.now() + messageCount).toISOString(), + "acquisitionStatus": "ACQUIRED", + "numericId": 1 + } + ] + } + }; + const event = new Event('message'); + event.data = JSON.stringify(message); + messageEvents.push(event); + } + + messageEvents.forEach(event => { + self.currentWebSocket.dispatchEvent(event); + }); + + }, { + call: callNumber, + COUNT: PARAMETER_VALUES_COUNT + }); + + // Subscribe to Battery1_Temp so we can confirm that the injected parameter values were received, + const telemetryValues = await page.evaluate(async () => { + const openmct = window.openmct; + const objectIdentifier = { + namespace: 'taxonomy', + key: '~myproject~Battery1_Temp' + }; + const telemetryObject = await openmct.objects.get(objectIdentifier); + + return new Promise((resolveWithTelemetry) => { + openmct.telemetry.subscribe(telemetryObject, (telemetry) => { + resolveWithTelemetry(telemetry); + }, {strategy: 'batch'}); + }); + }); + + expect(telemetryValues.length).toBe(PARAMETER_VALUES_COUNT); + const notification = page.getByRole('alert'); const count = await notification.count(); @@ -257,11 +355,14 @@ test.describe('Realtime telemetry displays', () => { // 2. Block the UI with a loop await page.evaluate(() => { return new Promise((resolveBlockingLoop) => { - //5s x 10Hz data = 50 telemetry values which should easily overrun the buffer length of 20. - let start = Date.now(); + const start = Date.now(); let now = Date.now(); - // Block the UI thread for 5s - while (now - start < 5000) { + // BUFFER_LENGTH / (AVG_MESSAGE_LENGTH_CHARS * MSGS_PER_SECOND) + // 1000000 / (500 * 10hz * 40 subscriptions) = ~ 5s (6 to be safe) + const durationToBlockFor = 6000; + + // Block the UI thread for 6s per above calculation + while (now - start < durationToBlockFor) { now = Date.now(); } @@ -283,8 +384,8 @@ test.describe('Realtime telemetry displays', () => { return new Promise((resolveBlockingLoop) => { let start = Date.now(); let now = Date.now(); - // Block the UI thread for 5s - while (now - start < 5000) { + // Block the UI thread for 6s + while (now - start < 6000) { now = Date.now(); } @@ -307,7 +408,7 @@ test.describe('Realtime telemetry displays', () => { test('Open MCT accurately batches telemetry when requested', async ({ page }) => { - // 1. Subscribe to batched telemetry, + // 1. Subscribe to batched telemetry,e const telemetryValues = await page.evaluate(async () => { const openmct = window.openmct; const telemetryObject = await openmct.objects.get({ @@ -342,7 +443,8 @@ test.describe('Realtime telemetry displays', () => { }); const formattedParameterArchiveTelemetry = toOpenMctTelemetryFormat(parameterArchiveTelemetry); sortOpenMctTelemetryAscending(formattedParameterArchiveTelemetry); - + console.log(`RT batch: ${JSON.stringify(telemetryValues)}`); + console.log(`Archive: ${JSON.stringify(formattedParameterArchiveTelemetry)}`); telemetryValues.forEach((telemetry, index) => { expect(telemetry.value).toBe(formattedParameterArchiveTelemetry[index].value); expect(telemetry.timestamp).toBe(formattedParameterArchiveTelemetry[index].timestamp); @@ -414,7 +516,7 @@ test.describe('Realtime telemetry displays', () => { } /** - * @param {import('playwright').Page} page + * @param {import('playwright').Page} page * @returns {Promise<{parameterNameText: string, parameterValueText: string}[]>} */ async function getParameterValuesFromAllGauges(page) {