Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
akhenry committed Sep 11, 2024
1 parent cb558a3 commit ea7f6df
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 19 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"lint": "eslint src example",
"lint:fix": "eslint src example --fix",
"build:dist": "webpack --config ./.webpack/webpack.prod.mjs",
"build:example": "npm install openmct@unstable --no-save",
"build:example": "npm install nasa/openmct#improved-buffering --no-save",
"build:example:master": "npm install nasa/openmct --no-save",
"postbuild:example": "node check-optional-dependencies.mjs",
"start": "npx webpack serve --config ./.webpack/webpack.dev.mjs",
Expand All @@ -29,7 +29,7 @@
"posttest:getopensource": "npm install",
"test:e2e:smoke": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium quickstartSmoke",
"test:e2e:quickstart": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=chromium tests/e2e/yamcs/",
"test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/",
"test:e2e:quickstart:local": "npm test --workspace tests/e2e/opensource -- --config=../playwright-quickstart.config.js --project=local-chrome tests/e2e/yamcs/ --debug",
"test:e2e:watch": "npm test --workspace tests/e2e/opensource -- --ui --config=../playwright-quickstart.config.js",
"wait-for-yamcs": "wait-on http-get://localhost:8090/ -v"
},
Expand Down
20 changes: 13 additions & 7 deletions src/providers/realtime-provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ export default class RealtimeProvider {
if (subscriptionDetails) {
this.sendUnsubscribeMessage(subscriptionDetails);

this.subscriptionsByCall.delete(subscriptionDetails.call.toString());
this.subscriptionsByCall.delete(subscriptionDetails.call);
delete this.subscriptionsById[id];
}
};
Expand All @@ -160,12 +160,18 @@ export default class RealtimeProvider {
this.sendUnsubscribeMessage(subscriptionDetails);

if (this.subscriptionsById[id]) {
this.subscriptionsByCall.delete(this.subscriptionsById[id].call.toString());
this.subscriptionsByCall.delete(this.subscriptionsById[id].call);
delete this.subscriptionsById[id];
}
};
}

getSubscriptionByObjectIdentifier(identifier) {
const objectKeystring = this.#openmct.objects.makeKeyString(identifier);

Check failure on line 170 in src/providers/realtime-provider.js

View workflow job for this annotation

GitHub Actions / lint

'objectKeystring' is assigned a value but never used

return Object.values(this.subscriptionsById).find(subscription => this.#openmct.objects.areIdsEqual(subscription.domainObject.identifier, identifier));
}

buildSubscriptionDetails(domainObject, callback, options) {
let subscriptionId = this.lastSubscriptionId++;
let subscriptionDetails = {
Expand Down Expand Up @@ -201,7 +207,7 @@ export default class RealtimeProvider {
});

if (correspondingSubscription !== undefined) {
this.remoteClockCallNumber = correspondingSubscription.call.toString();
this.remoteClockCallNumber = correspondingSubscription.call;
} else {
delete this.remoteClockCallNumber;
}
Expand All @@ -217,11 +223,12 @@ export default class RealtimeProvider {
const allClockValues = [];

// We only care about the most recent clock tick message in the batch.
this.#convertMessageToDatumAndReportStaleness(remoteClockValues[remoteClockValues.length - 1], subscriptionDetails, allClockValues);
remoteClockValues.forEach((parameterValue) => {
this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allClockValues);
});

if (allClockValues.length > 0) {
// A single parameter update message can include multiple values, again we only care about the most recent one.
subscriptionDetails.callback(allClockValues[allClockValues.length - 1]);
subscriptionDetails.callback(allClockValues);
}

// Delete so we don't process it twice.
Expand Down Expand Up @@ -291,7 +298,6 @@ export default class RealtimeProvider {
const newBatch = batchEvent.detail;
const parametersByCall = new Map();
newBatch.forEach(messageString => {
//const message = JSON.parse(messageString);
const message = JSON.parse(messageString);
const call = message.call;
if (message.type === 'parameters') {
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/yamcs/quickstartTools.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async function disableLink(yamcsURL) {
}

async function enableLink(yamcsURL) {
console.log(`YAMCS URL: ${yamcsURL}`);
const url = new URL(`api/links/myproject/udp-in:enable`, yamcsURL);
await fetch(url.toString(), {
method: 'POST'
Expand Down
122 changes: 112 additions & 10 deletions tests/e2e/yamcs/realtimeData.e2e.spec.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ test.describe('Realtime telemetry displays', () => {

test('Correctly shows the latest values', async ({ page }) => {
// Wait a reasonable amount of time for new telemetry to come in.
// There is nothing significant about the number chosen.
// There is nothing significant about the number chosen. It's
// long enough to ensure we have new telemetry, short enough that
// it doesn't significantly increase test time.
const WAIT_FOR_MORE_TELEMETRY = 3000;

const ladTable = await getLadTableByName(page, 'Test LAD Table');
Expand Down Expand Up @@ -238,6 +240,102 @@ test.describe('Realtime telemetry displays', () => {
});

test('Open MCT does not drop telemetry while app is loading', async ({ page }) => {
//TODO: Upgrade this test to cycle through notifications, don't just use the last visible one.
const notification = page.getByRole('alert');
const count = await notification.count();

if (count > 0) {
const text = await notification.innerText();
expect(text).not.toBe('Telemetry dropped due to client rate limiting.');
} else {
expect(notification).toHaveCount(0);
}
});

test.only('Open MCT does not drop telemetry when a burst of telemetry arrives', async ({ page }) => {
const PARAMETER_VALUES_COUNT = 60;
/**
* A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once.
* A burst of 60 messages will overwhelm a per-parameter telemetry buffer of 50, but will not overwhelm a larger shared buffer.
*/

// Disable real playback. We are going to inject our own batch of messages
await disableLink(yamcsURL);

const callNumber = await page.evaluate(async () => {
const openmct = window.openmct;
const objectIdentifier = {
namespace: 'taxonomy',
key: '~myproject~Battery1_Temp'
};
const telemetryObject = await openmct.objects.get(objectIdentifier);
const yamcsRealtimeProvider = await openmct.telemetry.findSubscriptionProvider(telemetryObject);

return yamcsRealtimeProvider.getSubscriptionByObjectIdentifier(objectIdentifier).call;

});

// Need to get call number of `Battery1_Temp` so we can use it in injected telemetry data
websocketWorker.evaluate(({call, COUNT}) => {
const messageEvents = [];

for (let messageCount = 0; messageCount < COUNT; messageCount++) {
const message = {
"type": "parameters",
"call": call,
"seq": messageCount,
"data": {
"@type": "/yamcs.protobuf.processing.SubscribeParametersData",
"values": [
{
"rawValue": {
"type": "FLOAT",
"floatValue": 10.204108
},
"engValue": {
"type": "FLOAT",
"floatValue": 10.204108
},
"acquisitionTime": new Date(Date.now() + messageCount).toISOString(),
"generationTime": new Date(Date.now() + messageCount).toISOString(),
"acquisitionStatus": "ACQUIRED",
"numericId": 1
}
]
}
};
const event = new Event('message');
event.data = JSON.stringify(message);
messageEvents.push(event);
}

messageEvents.forEach(event => {
self.currentWebSocket.dispatchEvent(event);
});

}, {
call: callNumber,
COUNT: PARAMETER_VALUES_COUNT
});

// Subscribe to Battery1_Temp so we can confirm that the injected parameter values were received,
const telemetryValues = await page.evaluate(async () => {
const openmct = window.openmct;
const objectIdentifier = {
namespace: 'taxonomy',
key: '~myproject~Battery1_Temp'
};
const telemetryObject = await openmct.objects.get(objectIdentifier);

return new Promise((resolveWithTelemetry) => {
openmct.telemetry.subscribe(telemetryObject, (telemetry) => {
resolveWithTelemetry(telemetry);
}, {strategy: 'batch'});
});
});

expect(telemetryValues.length).toBe(PARAMETER_VALUES_COUNT);

const notification = page.getByRole('alert');
const count = await notification.count();

Expand All @@ -257,11 +355,14 @@ test.describe('Realtime telemetry displays', () => {
// 2. Block the UI with a loop
await page.evaluate(() => {
return new Promise((resolveBlockingLoop) => {
//5s x 10Hz data = 50 telemetry values which should easily overrun the buffer length of 20.
let start = Date.now();
const start = Date.now();
let now = Date.now();
// Block the UI thread for 5s
while (now - start < 5000) {
// BUFFER_LENGTH / (AVG_MESSAGE_LENGTH_CHARS * MSGS_PER_SECOND)
// 1000000 / (500 * 10hz * 40 subscriptions) = ~ 5s (6 to be safe)
const durationToBlockFor = 6000;

// Block the UI thread for 6s per above calculation
while (now - start < durationToBlockFor) {
now = Date.now();
}

Expand All @@ -283,8 +384,8 @@ test.describe('Realtime telemetry displays', () => {
return new Promise((resolveBlockingLoop) => {
let start = Date.now();
let now = Date.now();
// Block the UI thread for 5s
while (now - start < 5000) {
// Block the UI thread for 6s
while (now - start < 6000) {
now = Date.now();
}

Expand All @@ -307,7 +408,7 @@ test.describe('Realtime telemetry displays', () => {

test('Open MCT accurately batches telemetry when requested', async ({ page }) => {

// 1. Subscribe to batched telemetry,
// 1. Subscribe to batched telemetry,e
const telemetryValues = await page.evaluate(async () => {
const openmct = window.openmct;
const telemetryObject = await openmct.objects.get({
Expand Down Expand Up @@ -342,7 +443,8 @@ test.describe('Realtime telemetry displays', () => {
});
const formattedParameterArchiveTelemetry = toOpenMctTelemetryFormat(parameterArchiveTelemetry);
sortOpenMctTelemetryAscending(formattedParameterArchiveTelemetry);

console.log(`RT batch: ${JSON.stringify(telemetryValues)}`);
console.log(`Archive: ${JSON.stringify(formattedParameterArchiveTelemetry)}`);
telemetryValues.forEach((telemetry, index) => {
expect(telemetry.value).toBe(formattedParameterArchiveTelemetry[index].value);
expect(telemetry.timestamp).toBe(formattedParameterArchiveTelemetry[index].timestamp);
Expand Down Expand Up @@ -414,7 +516,7 @@ test.describe('Realtime telemetry displays', () => {
}

/**
* @param {import('playwright').Page} page
* @param {import('playwright').Page} page
* @returns {Promise<{parameterNameText: string, parameterValueText: string}[]>}
*/
async function getParameterValuesFromAllGauges(page) {
Expand Down

0 comments on commit ea7f6df

Please sign in to comment.