Skip to content

Commit

Permalink
Add test for edge-case relating to throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
akhenry committed Sep 29, 2024
1 parent ab96697 commit e379e3c
Showing 1 changed file with 63 additions and 148 deletions.
211 changes: 63 additions & 148 deletions tests/e2e/yamcs/realtimeData.e2e.spec.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,22 @@ test.describe('Realtime telemetry displays', () => {
});

// Go to baseURL
await page.goto('./', { waitUntil: 'domcontentloaded' });
await page.goto('./', { waitUntil: 'networkidle' });
await page.evaluate((thirtyMinutes) => {
const openmct = window.openmct;
return new Promise((resolve) => {
const openmct = window.openmct;

openmct.install(openmct.plugins.RemoteClock({
namespace: "taxonomy",
key: "~myproject~Battery1_Temp"
}));
openmct.install(openmct.plugins.RemoteClock({
namespace: "taxonomy",
key: "~myproject~Battery1_Temp"
}));

openmct.time.setClock('remote-clock');
openmct.time.setClockOffsets({
start: -thirtyMinutes,
end: 0
openmct.time.setClock('remote-clock');
openmct.time.setClockOffsets({
start: -thirtyMinutes,
end: 15000
});
setTimeout(resolve, 2000);
});
}, THIRTY_MINUTES);
yamcsURL = new URL('/yamcs-proxy/', page.url()).toString();
Expand Down Expand Up @@ -250,151 +253,58 @@ test.describe('Realtime telemetry displays', () => {
expect(notification).toHaveCount(0);
}
});

test('Open MCT does not drop telemetry when a burst of telemetry arrives that exceeds the length of 60 messages', async ({ page }) => {
const PARAMETER_VALUES_COUNT = 60;
/**
* A failure mode of the previous implementation of batching was when bursts of telemetry from a parameter arrived all at once.
* A burst of 60 messages will overwhelm a per-parameter telemetry buffer of 50, but will not overwhelm a larger shared buffer.
*/

// Disable real playback. We are going to inject our own batch of messages
await disableLink(yamcsURL);

/**
* Yamcs tracks subscriptions by "call number". The call number is assigned by Yamcs,
* so we don't know it ahead of time. We have to retrieve it at runtime after the subscription
* has been established.
*
* We need to know the call number, because it's how the receiver (Open MCT) ties a parameter
* value that is received over a WebSocket back to the correct subscription.
*/
const batteryTempParameterCallNumber = await page.evaluate(async () => {
const openmct = window.openmct;
const objectIdentifier = {
namespace: 'taxonomy',
key: '~myproject~Battery1_Temp'
};
const telemetryObject = await openmct.objects.get(objectIdentifier);
const yamcsRealtimeProvider = await openmct.telemetry.findSubscriptionProvider(telemetryObject);

return yamcsRealtimeProvider.getSubscriptionByObjectIdentifier(objectIdentifier).call;

});

websocketWorker.evaluate(({call, COUNT}) => {
const messageEvents = [];
/**
* Inject a burst of 60 messages.
*/
for (let messageCount = 0; messageCount < COUNT; messageCount++) {
const message = {
"type": "parameters",
//This is where we use the call number retrieved previously
"call": call,
"seq": messageCount,
"data": {
"@type": "/yamcs.protobuf.processing.SubscribeParametersData",
"values": [
{
"rawValue": {
"type": "FLOAT",
"floatValue": 10.204108
},
"engValue": {
"type": "FLOAT",
"floatValue": 10.204108
},
"acquisitionTime": new Date(Date.now() + messageCount).toISOString(),
"generationTime": new Date(Date.now() + messageCount).toISOString(),
"acquisitionStatus": "ACQUIRED",
"numericId": 1
}
]
}
};
/**
* We are building an array of Event objects of type 'message'. Dispatching an event of this
* type on a WebSocket will cause all listeners subscribed to 'message' events to receive it.
* The receiving code will not know the difference between an Event that is dispatched from
* code vs. one that caused by the arrival of data over the wire.
* @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event
*/
const event = new Event('message');
event.data = JSON.stringify(message);
messageEvents.push(event);
}

/**
* Dispatch the 60 WebSocket message events we just created
*/
messageEvents.forEach(event => {
self.currentWebSocket.dispatchEvent(event);
});

}, {
call: batteryTempParameterCallNumber,
COUNT: PARAMETER_VALUES_COUNT
});

// Subscribe to Battery1_Temp so we can confirm that the injected parameter values were received,
const telemetryValues = await page.evaluate(async () => {
const openmct = window.openmct;
const objectIdentifier = {
namespace: 'taxonomy',
key: '~myproject~Battery1_Temp'
};
const telemetryObject = await openmct.objects.get(objectIdentifier);

return new Promise((resolveWithTelemetry) => {
openmct.telemetry.subscribe(telemetryObject, (telemetry) => {
resolveWithTelemetry(telemetry);
}, {strategy: 'batch'});
});
});
// To avoid test flake use >= instead of =. Because yamcs is also flowing data immediately prior to this test there
// can be some real data still in the buffer or in-transit. It's inherently stochastic because the Yamcs instance is not
// isolated between tests, but it doesn't invalidate the test in this case.
expect(telemetryValues.length).toBeGreaterThanOrEqual(PARAMETER_VALUES_COUNT);

const notification = page.getByRole('alert');
const count = await notification.count();

if (count > 0) {
const text = await notification.innerText();
expect(text).not.toBe('Telemetry dropped due to client rate limiting.');
} else {
expect(notification).toHaveCount(0);
}
});

test('Open MCT does drop telemetry when the UI is under load', async ({ page }) => {
// 1. Make sure the display is done loading, and populated with values (ie. we are in a steady state)
const ladTable = await getLadTableByName(page, 'Test LAD Table');
await getParameterValuesFromLadTable(ladTable);

// 2. Block the UI with a loop
/**
* This tests for an edge-case found during testing where throttling occurs during subscription handshaking with the server.
* In this scenario, subscribers never receive telemetry because the subscription was never properly initialized.
* This test confirms that after blocking the UI and inducing throttling, that all subscribed telemetry objects received telemetry.
*/
test('When the UI is blocked during initialization, does not drop subscription housekeeping messages', async ({ page }) => {
// 1. Block the UI
await page.evaluate(() => {
return new Promise((resolveBlockingLoop) => {
const start = Date.now();
let start = Date.now();
let now = Date.now();
// BUFFER_LENGTH / (AVG_MESSAGE_LENGTH_CHARS * MSGS_PER_SECOND)
// 1000000 / (500 * 10hz * 40 subscriptions) = ~ 5s (6 to be safe)
const durationToBlockFor = 6000;

// Block the UI thread for 6s per above calculation
while (now - start < durationToBlockFor) {
// Block the UI thread for 6s
while (now - start < 10000) {
now = Date.now();
}

resolveBlockingLoop();
});
});
// Check for telemetry dropped notification

//Confirm that throttling occurred
const notification = page.getByRole('alert');
expect(notification).toHaveCount(1);
const text = await notification.innerText();
expect(text).toBe('Telemetry dropped due to client rate limiting.');

//Confirm that all subscribed telemetry points receive telemetry. This tests that subscriptions were established successfully and
//tests for a failure mode where housekeeping telemetry was being dropped if the UI was blocked during initialization of telemetry subscriptions
const parametersToSubscribeTo = Object.values(namesToParametersMap).map(parameter => parameter.replaceAll('/', '~'));
const subscriptionsThatTelemetry = await page.evaluate(async (parameters) => {
const openmct = window.openmct;
const telemetryObjects = await Promise.all(
Object.values(parameters).map(
(parameterId) => openmct.objects.get(
{
namespace: 'taxonomy',
key: parameterId
}
)
));
const subscriptionsAllReturned = await Promise.all(telemetryObjects.map((telemetryObject) => {
return new Promise(resolve => {
const unsubscribe = openmct.telemetry.subscribe(telemetryObject, () => {
unsubscribe();
resolve(true);
});
});
}));

return subscriptionsAllReturned;
}, parametersToSubscribeTo);

expect(subscriptionsThatTelemetry.length).toBe(parametersToSubscribeTo.length);
});

test('Open MCT shows the latest telemetry after UI is temporarily blocked', async ({ page }) => {
Expand All @@ -405,20 +315,25 @@ test.describe('Realtime telemetry displays', () => {
return new Promise((resolveBlockingLoop) => {
let start = Date.now();
let now = Date.now();
// Block the UI thread for 6s
while (now - start < 6000) {
// Block the UI thread for 10s
while (now - start < 10000) {
now = Date.now();
}

resolveBlockingLoop();
requestIdleCallback(resolveBlockingLoop);
});
});

//Confirm that throttling occurred
const notification = page.getByRole('alert');
const text = await notification.innerText();
expect(text).toBe('Telemetry dropped due to client rate limiting.');

// Disable playback
await disableLink(yamcsURL);

// Wait 1 second for values to propagate to client and render on screen.
await page.waitForTimeout(TELEMETRY_PROPAGATION_TIME);
// Wait for values to propagate to client and render on screen.
await page.waitForTimeout(5000);

const latestValueObjects = await latestParameterValues(Object.values(namesToParametersMap), yamcsURL);
const parameterNamesToLatestValues = toParameterNameToValueMap(latestValueObjects);
Expand Down

0 comments on commit e379e3c

Please sign in to comment.