Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1078] Use lexico timeserials and siteCode field in StateMessages #1926

Open
wants to merge 1 commit into
base: DTP-963/liveobjects-customer-typings
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ async function checkLiveObjectsPluginFiles() {
'src/plugins/liveobjects/objectid.ts',
'src/plugins/liveobjects/statemessage.ts',
'src/plugins/liveobjects/syncliveobjectsdatapool.ts',
'src/plugins/liveobjects/timeserial.ts',
]);

return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);
Expand Down
13 changes: 7 additions & 6 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';
import { DefaultTimeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand Down Expand Up @@ -49,19 +48,20 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial);
if (!this._canApplyOperation(opOriginTimeserial)) {
const opOriginTimeserial = msg.serial!;
const opSiteCode = msg.siteCode!;
if (!this._canApplyOperation(opOriginTimeserial, opSiteCode)) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`,
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`,
);
return;
}
// should update stored site timeserial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial;
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
Expand Down Expand Up @@ -125,7 +125,8 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials);
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
Expand Down
73 changes: 47 additions & 26 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
StateOperationAction,
StateValue,
} from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

export interface ObjectIdStateData {
/** A reference to another state object, used to support composable state objects. */
Expand All @@ -34,7 +33,7 @@ export type StateData = ObjectIdStateData | ValueStateData;

export interface MapEntry {
tombstone: boolean;
timeserial: Timeserial;
timeserial: string | undefined;
data: StateData | undefined;
}

Expand Down Expand Up @@ -131,19 +130,20 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
);
}

const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial);
if (!this._canApplyOperation(opOriginTimeserial)) {
const opOriginTimeserial = msg.serial!;
const opSiteCode = msg.siteCode!;
if (!this._canApplyOperation(opOriginTimeserial, opSiteCode)) {
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`,
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`,
);
return;
}
// should update stored site timeserial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial;
this._siteTimeserials[opSiteCode] = opOriginTimeserial;
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
Expand Down Expand Up @@ -233,7 +233,8 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials);
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
Expand Down Expand Up @@ -311,9 +312,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations.
Object.entries(stateOperation.map.entries ?? {}).forEach(([key, entry]) => {
// for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message
const opOriginTimeserial = entry.timeserial
? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial)
: DefaultTimeserial.zeroValueTimeserial(this._client);
const opOriginTimeserial = entry.timeserial;
let update: LiveMapUpdate | LiveObjectUpdateNoop;
if (entry.tombstone === true) {
// entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op
Expand Down Expand Up @@ -370,20 +369,17 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return this._mergeInitialDataFromCreateOperation(op);
}

private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop {
private _applyMapSet(op: StateMapOp, opOriginTimeserial: string | undefined): LiveMapUpdate | LiveObjectUpdateNoop {
const { ErrorInfo, Utils } = this._client;

const existingEntry = this._dataRef.data.get(op.key);
if (
existingEntry &&
(opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial))
) {
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opOriginTimeserial)) {
// the operation's origin timeserial <= the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapSet()',
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`,
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this._objectId}`,
);
return { noop: true };
}
Expand Down Expand Up @@ -423,18 +419,18 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return { update: { [op.key]: 'updated' } };
}

private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop {
private _applyMapRemove(
op: StateMapOp,
opOriginTimeserial: string | undefined,
): LiveMapUpdate | LiveObjectUpdateNoop {
const existingEntry = this._dataRef.data.get(op.key);
if (
existingEntry &&
(opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial))
) {
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opOriginTimeserial)) {
// the operation's origin timeserial <= the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapRemove()',
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`,
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this._objectId}`,
);
return { noop: true };
}
Expand All @@ -455,6 +451,34 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return { update: { [op.key]: 'removed' } };
}

/**
* Returns true if the origin timeserials of the given operation and entry indicate that
* the operation should be applied to the entry, following the CRDT semantics of this LiveMap.
*/
private _canApplyMapOperation(entryTimeserial: string | undefined, opTimeserial: string | undefined): boolean {
// for LWW CRDT semantics (the only supported LiveMap semantic) an operation
// should only be applied if its timeserial is strictly greater ("after") than an entry's timeserial.

if (!entryTimeserial && !opTimeserial) {
// if both timeserials are nullish or emptry strings, we treat them as the "earliest possible" timeserials,
// in which case they are "equal", so the operation should not be applied
return false;
}

if (!entryTimeserial) {
// any op timeserial is greater than non-existing entry timeserial
return true;
}

if (!opTimeserial) {
// non-existing op timeserial is lower than any entry timeserial
return false;
}

// if both timeserials exist, compare them lexicographically
return opTimeserial > entryTimeserial;
}

private _liveMapDataFromMapEntries(entries: Record<string, StateMapEntry>): LiveMapData {
const liveMapData: LiveMapData = {
data: new Map<string, MapEntry>(),
Expand All @@ -470,10 +494,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}

const liveDataEntry: MapEntry = {
...entry,
timeserial: entry.timeserial
? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial)
: DefaultTimeserial.zeroValueTimeserial(this._client),
timeserial: entry.timeserial,
// true only if we received explicit true. otherwise always false
tombstone: entry.tombstone === true,
data: liveData,
Expand Down
28 changes: 11 additions & 17 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type BaseClient from 'common/lib/client/baseclient';
import type EventEmitter from 'common/lib/util/eventemitter';
import { LiveObjects } from './liveobjects';
import { StateMessage, StateObject, StateOperation } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

enum LiveObjectEvents {
Updated = 'Updated',
Expand Down Expand Up @@ -38,7 +37,7 @@ export abstract class LiveObject<
* and all state operations applied to the object.
*/
protected _dataRef: TData;
protected _siteTimeserials: Record<string, Timeserial>;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;

protected constructor(
Expand Down Expand Up @@ -106,22 +105,17 @@ export abstract class LiveObject<
* An operation should be applied if the origin timeserial is strictly greater than the timeserial in the site timeserials for the same site.
* If the site timeserials do not contain a timeserial for the site of the origin timeserial, the operation should be applied.
*/
protected _canApplyOperation(opOriginTimeserial: Timeserial): boolean {
const siteTimeserial = this._siteTimeserials[opOriginTimeserial.siteCode];
return !siteTimeserial || opOriginTimeserial.after(siteTimeserial);
}
protected _canApplyOperation(opOriginTimeserial: string | undefined, opSiteCode: string | undefined): boolean {
if (!opOriginTimeserial) {
throw new this._client.ErrorInfo(`Invalid timeserial: ${opOriginTimeserial}`, 50000, 500);
}

if (!opSiteCode) {
throw new this._client.ErrorInfo(`Invalid site code: ${opSiteCode}`, 50000, 500);
}

protected _timeserialMapFromStringMap(stringTimeserialsMap: Record<string, string>): Record<string, Timeserial> {
const objTimeserialsMap = Object.entries(stringTimeserialsMap).reduce(
(acc, v) => {
const [key, timeserialString] = v;
acc[key] = DefaultTimeserial.calculateTimeserial(this._client, timeserialString);
return acc;
},
{} as Record<string, Timeserial>,
);

return objTimeserialsMap;
const siteTimeserial = this._siteTimeserials[opSiteCode];
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

private _createObjectId(): string {
Expand Down
8 changes: 6 additions & 2 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ export interface StateMapEntry {
/**
* The *origin* timeserial of the last operation that was applied to the map entry.
*
* It is optional in a MAP_CREATE operation and might be missing, in which case the client should default to using zero-value timeserial,
* which is the "earliest possible" timeserial. This will allow any other operation to update the field based on a timeserial comparison.
* It is optional in a MAP_CREATE operation and might be missing, in which case the client should use a nullish value for it
* and treat it as the "earliest possible" timeserial for comparison purposes.
*/
timeserial?: string;
/** The data that represents the value of the map entry. */
Expand Down Expand Up @@ -140,6 +140,8 @@ export class StateMessage {
object?: StateObject;
/** Timeserial format. Contains the origin timeserial for this state message. */
serial?: string;
/** Site code corresponding to this message's timeserial */
siteCode?: string;

constructor(private _platform: typeof Platform) {}

Expand Down Expand Up @@ -357,12 +359,14 @@ export class StateMessage {
if (this.timestamp) result += '; timestamp=' + this.timestamp;
if (this.clientId) result += '; clientId=' + this.clientId;
if (this.connectionId) result += '; connectionId=' + this.connectionId;
if (this.channel) result += '; channel=' + this.channel;
// TODO: prettify output for operation and object and encode buffers.
// see examples for data in Message and PresenceMessage
if (this.operation) result += '; operation=' + JSON.stringify(this.operation);
if (this.object) result += '; object=' + JSON.stringify(this.object);
if (this.extras) result += '; extras=' + JSON.stringify(this.extras);
if (this.serial) result += '; serial=' + this.serial;
if (this.siteCode) result += '; siteCode=' + this.siteCode;

result += ']';

Expand Down
Loading
Loading