Skip to content

Commit

Permalink
feat(client-presence): initial telemetry (microsoft#22656)
Browse files Browse the repository at this point in the history
Add monitoring context `Presence`
Two events:
- `PresenceInitiated` logged when runtime instantiates `Presence`
feature.
- `JoinResponse` logged when a client manages response to another
client's Join message.

Add basic test infrastructure to check expected events and protocol
handling.
- Add mock IEphemeralRuntime implementation with signal tracking
- Refactor SessionId assignment to allow testing to specify it (mocking
underlying functions used generate it is not supported with ESM)
- Update/fix Presence test name hierarchy
  • Loading branch information
jason-ha authored Sep 29, 2024
1 parent d1eade6 commit 6fd547b
Show file tree
Hide file tree
Showing 13 changed files with 635 additions and 68 deletions.
8 changes: 7 additions & 1 deletion packages/framework/presence/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,23 @@
"@fluidframework/id-compressor": "workspace:~",
"@fluidframework/runtime-definitions": "workspace:~",
"@fluidframework/runtime-utils": "workspace:~",
"@fluidframework/shared-object-base": "workspace:~"
"@fluidframework/shared-object-base": "workspace:~",
"@fluidframework/telemetry-utils": "workspace:~"
},
"devDependencies": {
"@arethetypeswrong/cli": "^0.15.2",
"@biomejs/biome": "~1.8.3",
"@fluid-tools/build-cli": "^0.46.0",
"@fluidframework/build-common": "^2.0.3",
"@fluidframework/build-tools": "^0.46.0",
"@fluidframework/driver-definitions": "workspace:~",
"@fluidframework/eslint-config-fluid": "^5.4.0",
"@fluidframework/test-runtime-utils": "workspace:~",
"@fluidframework/test-utils": "workspace:~",
"@microsoft/api-extractor": "7.47.8",
"@types/mocha": "^9.1.1",
"@types/node": "^18.19.0",
"@types/sinon": "^17.0.3",
"c8": "^8.0.1",
"concurrently": "^8.2.1",
"copyfiles": "^2.4.1",
Expand All @@ -148,6 +153,7 @@
"mocha-multi-reporters": "^1.5.1",
"prettier": "~3.0.3",
"rimraf": "^4.4.0",
"sinon": "^17.0.1",
"typescript": "~5.4.5"
},
"fluidBuild": {
Expand Down
8 changes: 6 additions & 2 deletions packages/framework/presence/src/internalTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import type { IContainerRuntime } from "@fluidframework/container-runtime-definitions/internal";
import type { IFluidDataStoreRuntime } from "@fluidframework/datastore-definitions/internal";
import type { MonitoringContext } from "@fluidframework/telemetry-utils/internal";

import type { InternalTypes } from "./exposedInternalTypes.js";
import type { ClientSessionId, IPresence, ISessionClient } from "./presence.js";
Expand Down Expand Up @@ -44,14 +45,17 @@ export const brandedObjectEntries = Object.entries as <K extends string, T>(
export type IEphemeralRuntime = Pick<
(IContainerRuntime & IRuntimeInternal) | IFluidDataStoreRuntime,
"clientId" | "connected" | "getQuorum" | "off" | "on" | "submitSignal"
>;
> &
Partial<Pick<IFluidDataStoreRuntime, "logger">>;

/**
* Collection of utilities provided by PresenceManager that are used by presence sub-components.
*
* @internal
*/
export type PresenceManagerInternal = Pick<IPresence, "getAttendee">;
export type PresenceManagerInternal = Pick<IPresence, "getAttendee"> & {
readonly mc: MonitoringContext | undefined;
};

/**
* @internal
Expand Down
100 changes: 67 additions & 33 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,39 +250,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {

if (message.type === joinMessageType) {
assert(this.runtime.connected, "Received presence join signal while not connected");
const updateProviders = message.content.updateProviders;
this.refreshBroadcastRequested = true;
// We must be connected to receive this message, so clientId should be defined.
// If it isn't then, not really a problem; just won't be in provider or quorum list.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const clientId = this.runtime.clientId!;
if (updateProviders.includes(clientId)) {
// Send all current state to the new client
this.broadcastAllKnownState();
} else {
// Schedule a broadcast to the new client after a delay only to send if
// another broadcast hasn't been seen in the meantime. The delay is based
// on the position in the quorum list. It doesn't have to be a stable
// list across all clients. We need something to provide suggested order
// to prevent a flood of broadcasts.
const quorumMembers = this.runtime.getQuorum().getMembers();
const indexOfSelf =
quorumMembers.get(clientId)?.sequenceNumber ??
// Index past quorum members + arbitrary additional offset up to 10
quorumMembers.size + Math.random() * 10;
// These numbers have been chosen arbitrarily to start with.
// 20 is minimum wait time, 20 is the additional wait time per provider
// given an chance before us with named providers given more time.
const waitTime = 20 + 20 * (3 * updateProviders.length + indexOfSelf);
setTimeout(() => {
// Make sure a broadcast is still needed and we are currently connected.
// If not connected, nothing we can do.
if (this.refreshBroadcastRequested && this.runtime.connected) {
// TODO: Add telemetry for this attempt to satisfy join
this.broadcastAllKnownState();
}
}, waitTime);
}
this.prepareJoinResponse(message.content.updateProviders, message.clientId);
} else {
assert(message.type === datastoreUpdateMessageType, 0xa3b /* Unexpected message type */);
if (message.content.isComplete) {
Expand Down Expand Up @@ -311,4 +279,70 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
}
}
}

/**
* Handles responding to another client joining the session.
*
* @param updateProviders - list of client connection id's that requestor selected
* to provide response
* @param requestor - `requestor` is only used in telemetry. While it is the requestor's
* client connection id, that is not most important. It is important that this is a
* unique shared id across all clients that might respond as we want to monitor the
* response patterns. The convenience of being client connection id will allow
* correlation with other telemetry where it is often called just `clientId`.
*/
private prepareJoinResponse(
updateProviders: ClientConnectionId[],
requestor: ClientConnectionId,
): void {
this.refreshBroadcastRequested = true;
// We must be connected to receive this message, so clientId should be defined.
// If it isn't then, not really a problem; just won't be in provider or quorum list.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const clientId = this.runtime.clientId!;
// const requestor = message.clientId;
if (updateProviders.includes(clientId)) {
// Send all current state to the new client
this.broadcastAllKnownState();
this.presence.mc?.logger.sendTelemetryEvent({
eventName: "JoinResponse",
details: {
type: "broadcastAll",
requestor,
role: "primary",
},
});
} else {
// Schedule a broadcast to the new client after a delay only to send if
// another broadcast hasn't been seen in the meantime. The delay is based
// on the position in the quorum list. It doesn't have to be a stable
// list across all clients. We need something to provide suggested order
// to prevent a flood of broadcasts.
const quorumMembers = this.runtime.getQuorum().getMembers();
const indexOfSelf =
quorumMembers.get(clientId)?.sequenceNumber ??
// Index past quorum members + arbitrary additional offset up to 10
quorumMembers.size + Math.random() * 10;
// These numbers have been chosen arbitrarily to start with.
// 20 is minimum wait time, 20 is the additional wait time per provider
// given an chance before us with named providers given more time.
const waitTime = 20 + 20 * (3 * updateProviders.length + indexOfSelf);
setTimeout(() => {
// Make sure a broadcast is still needed and we are currently connected.
// If not connected, nothing we can do.
if (this.refreshBroadcastRequested && this.runtime.connected) {
this.broadcastAllKnownState();
this.presence.mc?.logger.sendTelemetryEvent({
eventName: "JoinResponse",
details: {
type: "broadcastAll",
requestor,
role: "secondary",
order: indexOfSelf,
},
});
}
}, waitTime);
}
}
}
36 changes: 24 additions & 12 deletions packages/framework/presence/src/presenceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/

import { createSessionId } from "@fluidframework/id-compressor/internal";
import type { MonitoringContext } from "@fluidframework/telemetry-utils/internal";
import { createChildMonitoringContext } from "@fluidframework/telemetry-utils/internal";

import type { ClientConnectionId } from "./baseTypes.js";
import type { IEphemeralRuntime, PresenceManagerInternal } from "./internalTypes.js";
Expand Down Expand Up @@ -43,17 +45,26 @@ class PresenceManager
implements IPresence, PresenceExtensionInterface, PresenceManagerInternal
{
private readonly datastoreManager: PresenceDatastoreManager;
private readonly selfAttendee: ISessionClient = {
sessionId: createSessionId() as ClientSessionId,
currentConnectionId: () => {
throw new Error("Client has never been connected");
},
};
private readonly attendees = new Map<ClientConnectionId | ClientSessionId, ISessionClient>([
[this.selfAttendee.sessionId, this.selfAttendee],
]);

public constructor(runtime: IEphemeralRuntime) {
private readonly selfAttendee: ISessionClient;
private readonly attendees = new Map<ClientConnectionId | ClientSessionId, ISessionClient>();

public readonly mc: MonitoringContext | undefined = undefined;

public constructor(runtime: IEphemeralRuntime, clientSessionId: ClientSessionId) {
this.selfAttendee = {
sessionId: clientSessionId,
currentConnectionId: () => {
throw new Error("Client has never been connected");
},
};
this.attendees.set(clientSessionId, this.selfAttendee);

const logger = runtime.logger;
if (logger) {
this.mc = createChildMonitoringContext({ logger, namespace: "Presence" });
this.mc.logger.sendTelemetryEvent({ eventName: "PresenceInstantiated" });
}

// If already connected (now or in the past), populate self and attendees.
const originalClientId = runtime.clientId;
if (originalClientId !== undefined) {
Expand Down Expand Up @@ -137,6 +148,7 @@ class PresenceManager
*/
export function createPresenceManager(
runtime: IEphemeralRuntime,
clientSessionId: ClientSessionId = createSessionId() as ClientSessionId,
): IPresence & PresenceExtensionInterface {
return new PresenceManager(runtime);
return new PresenceManager(runtime, clientSessionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import type { LatestMapItemValueClientData } from "../index.js";
import { LatestMap } from "../index.js";
import type { IPresence } from "../presence.js";

describe("LatestMapValueManager", () => {
/**
* See {@link checkCompiles} below
*/
it("API use compiles", () => {});
describe("Presence", () => {
describe("LatestMapValueManager", () => {
/**
* See {@link checkCompiles} below
*/
it("API use compiles", () => {});
});
});

// ---- test (example) code ----
Expand Down
12 changes: 7 additions & 5 deletions packages/framework/presence/src/test/latestValueManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import type { LatestValueClientData } from "../index.js";
import { Latest } from "../index.js";
import type { IPresence } from "../presence.js";

describe("LatestValueManager", () => {
/**
* See {@link checkCompiles} below
*/
it("API use compiles", () => {});
describe("Presence", () => {
describe("LatestValueManager", () => {
/**
* See {@link checkCompiles} below
*/
it("API use compiles", () => {});
});
});

// ---- test (example) code ----
Expand Down
Loading

0 comments on commit 6fd547b

Please sign in to comment.