Skip to content

Commit

Permalink
Improved buffering (#477)
Browse files Browse the repository at this point in the history
* Modifies telemetry subscription handling in openmct-yamcs adapter to support improvements to the telemetry API in Open MCT.
* Removes the Yamcs-specific batching strategy previously required . Open MCT now uses a more flexible one-size fits all strategy
* Adds a new test to catch the failure mode identified in the previous strategy.
  • Loading branch information
akhenry authored Sep 30, 2024
1 parent f1b3922 commit ac4b0f3
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 165 deletions.
11 changes: 8 additions & 3 deletions example/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ const config = {
yamcsProcessor: "realtime",
yamcsFolder: "myproject",
throttleRate: 1000,
maxBatchSize: 20
// Batch size is specified in characers as there is no performant way of calculating true
// memory usage of a string buffer in real-time.
// String characters can be 8 or 16 bits in JavaScript, depending on the code page used.
// Thus 500,000 characters requires up to 16MB of memory (1,000,000 * 16).
maxBufferSize: 1000000
};
const STATUS_STYLES = {
NO_STATUS: {
Expand Down Expand Up @@ -41,7 +45,9 @@ const STATUS_STYLES = {
const openmct = window.openmct;

(() => {
const THIRTY_MINUTES = 30 * 60 * 1000;
const ONE_SECOND = 1000;
const ONE_MINUTE = ONE_SECOND * 60;
const THIRTY_MINUTES = ONE_MINUTE * 30;

openmct.setAssetPath("/node_modules/openmct/dist");

Expand All @@ -54,7 +60,6 @@ const openmct = window.openmct;
document.addEventListener("DOMContentLoaded", function () {
openmct.start();
});

openmct.install(
openmct.plugins.Conductor({
menuOptions: [
Expand Down
2 changes: 1 addition & 1 deletion src/openmct-yamcs.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export default function install(
configuration.yamcsInstance,
configuration.yamcsProcessor,
configuration.throttleRate,
configuration.maxBatchSize
configuration.maxBufferSize
);
openmct.telemetry.addProvider(realtimeTelemetryProvider);
realtimeTelemetryProvider.connect();
Expand Down
272 changes: 143 additions & 129 deletions src/providers/realtime-provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ import {
import { commandToTelemetryDatum } from './commands.js';
import { eventToTelemetryDatum, eventShouldBeFiltered } from './events.js';

const ONE_SECOND = 1000;
const ONE_MILLION_CHARACTERS = 1000000;

//Everything except parameter messages are housekeeping and if they're dropped bad things can happen.
const PARAMETER_MESSAGES = '^{[\\s]*"type":\\s"parameters';

export default class RealtimeProvider {
#socketWorker = null;
#openmct;

constructor(openmct, url, instance, processor = 'realtime', maxBatchWait = 1000, maxBatchSize = 15) {
constructor(openmct, url, instance, processor = 'realtime', throttleRate = ONE_SECOND, maxBufferSize = ONE_MILLION_CHARACTERS) {
this.url = url;
this.instance = instance;
this.processor = processor;
Expand All @@ -59,8 +65,10 @@ export default class RealtimeProvider {
this.subscriptionsByCall = new Map();
this.subscriptionsById = {};
this.#socketWorker = new openmct.telemetry.BatchingWebSocket(openmct);
this.#socketWorker.setThrottleMessagePattern(PARAMETER_MESSAGES);
this.#openmct = openmct;
this.#setBatchingStrategy(maxBatchWait, maxBatchSize);
this.#socketWorker.setThrottleRate(throttleRate);
this.#socketWorker.setMaxBufferSize(maxBufferSize);

this.addSupportedObjectTypes(Object.values(OBJECT_TYPES));
this.addSupportedDataTypes(Object.values(DATA_TYPES));
Expand All @@ -82,33 +90,6 @@ export default class RealtimeProvider {
this.#setCallFromClock(clock);
}
}
#setBatchingStrategy(maxBatchWait, maxBatchSize) {
// This strategy batches parameter value messages
this.#socketWorker.setBatchingStrategy({
/* istanbul ignore next */
shouldBatchMessage: /* istanbul ignore next */ (message) => {
// If a parameter value message, the message type will be "parameters"
// The type field is always located at a character offset of 13 and
// if it is "parameters" will be 10 characters long.
const type = message.substring(13, 23);

return type === 'parameters';
},
/* istanbul ignore next */
getBatchIdFromMessage: /* istanbul ignore next */ (message) => {
// Only dealing with "parameters" messages at this point. The call number
// identifies the parameter, and is used for batching. Will be located
// at a character offset of 36. Because it is of indeterminate length
// (we don't know the number) we have to do a sequential search forward
// from the 37th character for a terminating ",".
const callNumber = message.substring(36, message.indexOf(",", 37));

return callNumber;
}
});
this.#socketWorker.setMaxBatchWait(maxBatchWait);
this.#socketWorker.setMaxBatchSize(maxBatchSize);
}

addSupportedObjectTypes(types) {
types.forEach(type => this.supportedObjectTypes[type] = type);
Expand Down Expand Up @@ -158,7 +139,7 @@ export default class RealtimeProvider {
if (subscriptionDetails) {
this.sendUnsubscribeMessage(subscriptionDetails);

this.subscriptionsByCall.delete(subscriptionDetails.call.toString());
this.subscriptionsByCall.delete(subscriptionDetails.call);
delete this.subscriptionsById[id];
}
};
Expand All @@ -183,12 +164,16 @@ export default class RealtimeProvider {
this.sendUnsubscribeMessage(subscriptionDetails);

if (this.subscriptionsById[id]) {
this.subscriptionsByCall.delete(this.subscriptionsById[id].call.toString());
this.subscriptionsByCall.delete(this.subscriptionsById[id].call);
delete this.subscriptionsById[id];
}
};
}

getSubscriptionByObjectIdentifier(identifier) {
return Object.values(this.subscriptionsById).find(subscription => this.#openmct.objects.areIdsEqual(subscription.domainObject.identifier, identifier));
}

buildSubscriptionDetails(domainObject, callback, options) {
let subscriptionId = this.lastSubscriptionId++;
let subscriptionDetails = {
Expand Down Expand Up @@ -224,52 +209,78 @@ export default class RealtimeProvider {
});

if (correspondingSubscription !== undefined) {
this.remoteClockCallNumber = correspondingSubscription.call.toString();
this.remoteClockCallNumber = correspondingSubscription.call;
} else {
delete this.remoteClockCallNumber;
}
}

#processBatchQueue(batchQueue, call) {
let subscriptionDetails = this.subscriptionsByCall.get(call);
let telemetryData = [];
#processParameterUpdates(parameterValuesByCall) {
//If remote clock active, process its value before any telemetry values to ensure the bounds are always up to date.
if (this.remoteClockCallNumber !== undefined) {
const remoteClockValues = parameterValuesByCall.get(this.remoteClockCallNumber);
const subscriptionDetails = this.subscriptionsByCall.get(this.remoteClockCallNumber);

// possibly cancelled
if (!subscriptionDetails) {
return;
}
if (remoteClockValues !== undefined && remoteClockValues.length > 0) {
const allClockValues = [];

batchQueue.forEach((rawMessage) => {
const message = JSON.parse(rawMessage);
const values = message.data.values || [];
const parentName = subscriptionDetails.domainObject.name;
remoteClockValues.forEach((parameterValue) => {
this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allClockValues);
});

values.forEach(parameter => {
const datum = convertYamcsToOpenMctDatum(parameter, parentName);
if (allClockValues.length > 0) {
subscriptionDetails.callback(allClockValues);
}

// Delete so we don't process it twice.
parameterValuesByCall.delete(this.remoteClockCallNumber);
}
}

if (this.observingStaleness[subscriptionDetails.name] !== undefined) {
const status = STALENESS_STATUS_MAP[parameter.acquisitionStatus];
// Now process all non-clock parameter updates
for (const [call, parameterValues] of parameterValuesByCall.entries()) {
const allTelemetryData = [];
const subscriptionDetails = this.subscriptionsByCall.get(call);

if (this.observingStaleness[subscriptionDetails.name].response.isStale !== status) {
const stalenesResponseObject = buildStalenessResponseObject(
status,
parameter[METADATA_TIME_KEY]
);
this.observingStaleness[subscriptionDetails.name].response = stalenesResponseObject;
this.observingStaleness[subscriptionDetails.name].callback(stalenesResponseObject);
}
}
// possibly cancelled
if (!subscriptionDetails) {
continue;
}

addLimitInformation(parameter, datum);
telemetryData.push(datum);
parameterValues.forEach((parameterValue) => {
this.#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allTelemetryData);
});
});

if (telemetryData.length > 0) {
subscriptionDetails.callback(telemetryData);
if (allTelemetryData.length > 0) {
subscriptionDetails.callback(allTelemetryData);
}
}
}

#convertMessageToDatumAndReportStaleness(parameterValue, subscriptionDetails, allTelemetryData) {
const values = parameterValue.data.values || [];
const parentName = subscriptionDetails.domainObject.name;
values.forEach(parameter => {
const datum = convertYamcsToOpenMctDatum(parameter, parentName);

if (this.observingStaleness[subscriptionDetails.name] !== undefined) {
const status = STALENESS_STATUS_MAP[parameter.acquisitionStatus];

if (this.observingStaleness[subscriptionDetails.name].response.isStale !== status) {
const stalenesResponseObject = buildStalenessResponseObject(
status,
parameter[METADATA_TIME_KEY]
);
this.observingStaleness[subscriptionDetails.name].response = stalenesResponseObject;
this.observingStaleness[subscriptionDetails.name].callback(stalenesResponseObject);
}
}

addLimitInformation(parameter, datum);
allTelemetryData.push(datum);
});
}

connect() {
if (this.connected) {
return;
Expand All @@ -285,84 +296,87 @@ export default class RealtimeProvider {
});

this.#socketWorker.addEventListener('batch', (batchEvent) => {
const batch = batchEvent.detail;

let remoteClockValue;
// If remote clock active, process its value before any telemetry values to ensure the bounds are always up to date.
if (this.remoteClockCallNumber !== undefined) {
remoteClockValue = batch[this.remoteClockCallNumber];
if (remoteClockValue !== undefined) {
this.#processBatchQueue(batch[this.remoteClockCallNumber], this.remoteClockCallNumber);

// Delete so we don't process it twice.
delete batch[this.remoteClockCallNumber];
}
}
const newBatch = batchEvent.detail;
const parametersByCall = new Map();
newBatch.forEach(messageString => {
const message = JSON.parse(messageString);
const call = message.call;
if (message.type === 'parameters') {
// First, group parameter updates by call
let arrayOfParametersForCall = parametersByCall.get(call);

if (arrayOfParametersForCall === undefined) {
arrayOfParametersForCall = [];
parametersByCall.set(call, arrayOfParametersForCall);
}

Object.keys(batch).forEach((call) => {
this.#processBatchQueue(batch[call], call);
});
});
arrayOfParametersForCall.push(message);
} else {
if (!this.isSupportedDataType(message.type)) {
return;
}

this.#socketWorker.addEventListener('message', (messageEvent) => {
const message = JSON.parse(messageEvent.detail);
if (!this.isSupportedDataType(message.type)) {
return;
}
const isReply = message.type === DATA_TYPES.DATA_TYPE_REPLY;
let subscriptionDetails;

const isReply = message.type === DATA_TYPES.DATA_TYPE_REPLY;
const call = message.call;
let subscriptionDetails;
if (isReply) {
const id = message.data.replyTo;
subscriptionDetails = this.subscriptionsById[id];

if (isReply) {
const id = message.data.replyTo;
subscriptionDetails = this.subscriptionsById[id];
subscriptionDetails.call = call;
// Subsequent retrieval uses a string, so for performance reasons we use a string as a key.
this.subscriptionsByCall.set(call.toString(), subscriptionDetails);
// Susbcriptions can be cancelled before we even get to this stage during tests due to rapid navigation.
if (!subscriptionDetails) {
return;
}

const remoteClockIdentifier = this.#openmct.time.getClock()?.identifier;
const isRemoteClockActive = remoteClockIdentifier !== undefined;
subscriptionDetails.call = call;
// Subsequent retrieval uses a string, so for performance reasons we use a string as a key.
this.subscriptionsByCall.set(call, subscriptionDetails);

if (isRemoteClockActive && subscriptionDetails.domainObject.identifier.key === remoteClockIdentifier.key) {
this.remoteClockCallNumber = call.toString();
}
} else {
subscriptionDetails = this.subscriptionsByCall.get(message.call.toString());
const remoteClockIdentifier = this.#openmct.time.getClock()?.identifier;
const isRemoteClockActive = remoteClockIdentifier !== undefined;

// possibly cancelled
if (!subscriptionDetails) {
return;
}

if (this.isCommandMessage(message)) {
const datum = commandToTelemetryDatum(message.data);
subscriptionDetails.callback(datum);
} else if (this.isEventMessage(message)) {
if (eventShouldBeFiltered(message.data, subscriptionDetails.options)) {
// ignore event
if (isRemoteClockActive && subscriptionDetails.domainObject.identifier.key === remoteClockIdentifier.key) {
this.remoteClockCallNumber = call;
}
} else {
const datum = eventToTelemetryDatum(message.data);
subscriptionDetails.callback(datum);
}
} else if (this.isMdbChangesMessage(message)) {
if (!this.isParameterType(message)) {
return;
}

const parameterName = message.data.parameterOverride.parameter;
if (this.observingLimitChanges[parameterName] !== undefined) {
const alarmRange = message.data.parameterOverride.defaultAlarm?.staticAlarmRange ?? [];
this.observingLimitChanges[parameterName].callback(getLimitFromAlarmRange(alarmRange));
subscriptionDetails = this.subscriptionsByCall.get(message.call);

// possibly cancelled
if (!subscriptionDetails) {
return;
}

if (this.isCommandMessage(message)) {
const datum = commandToTelemetryDatum(message.data);
subscriptionDetails.callback(datum);
} else if (this.isEventMessage(message)) {
if (eventShouldBeFiltered(message.data, subscriptionDetails.options)) {
// ignore event
} else {
const datum = eventToTelemetryDatum(message.data);
subscriptionDetails.callback(datum);
}
} else if (this.isMdbChangesMessage(message)) {
if (!this.isParameterType(message)) {
return;
}

const parameterName = message.data.parameterOverride.parameter;
if (this.observingLimitChanges[parameterName] !== undefined) {
const alarmRange = message.data.parameterOverride.defaultAlarm?.staticAlarmRange ?? [];
this.observingLimitChanges[parameterName].callback(getLimitFromAlarmRange(alarmRange));
}

if (subscriptionDetails.callback) {
subscriptionDetails.callback(message.data);
}
} else {
subscriptionDetails.callback(message.data);
}
}

if (subscriptionDetails.callback) {
subscriptionDetails.callback(message.data);
}
} else {
subscriptionDetails.callback(message.data);
}
}
});
this.#processParameterUpdates(parametersByCall);
});
}

Expand Down
Loading

0 comments on commit ac4b0f3

Please sign in to comment.