diff --git a/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts b/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts new file mode 100644 index 000000000..221206212 --- /dev/null +++ b/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts @@ -0,0 +1,314 @@ +import { EventListener, EventName, LDLogger } from '../../../src/api'; +import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader'; + +class MockEventStream implements EventStream { + private _listeners: Record = {}; + + addEventListener(eventName: EventName, listener: EventListener): void { + this._listeners[eventName] = listener; + } + + simulateEvent(eventName: EventName, event: { data?: string }) { + this._listeners[eventName](event); + } +} + +it('it sets basis to true when intent code is xfer-full', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(true); +}); + +it('it sets basis to false when intent code is xfer-changes', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-changes", "id": "mockId"}]}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(false); +}); + +it('it includes multiple types of updates in payload', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('delete-object', { + data: '{"kind": "mockKind", "key": "flagB", "version": 123}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagC", "version": 123, "object": {"objectFieldC": "objectValueC"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(true); + expect(receivedPayloads[0].updates.length).toEqual(3); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' }); + expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined); + expect(receivedPayloads[0].updates[1].object).toEqual(undefined); + expect(receivedPayloads[0].updates[1].deleted).toEqual(true); + expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldC: 'objectValueC' }); + expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined); +}); + +it('it does not include messages thats are not between server-intent and payloader-transferred', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagShouldIgnore", "version": 123, "object": {"objectFieldShouldIgnore": "objectValueShouldIgnore"}}', + }); + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' }); +}); + +it('logs prescribed message when goodbye event is encountered', () => { + const mockLogger: LDLogger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader( + mockStream, + { + mockKind: (it) => it, // obj processor that just returns the same obj + }, + undefined, + mockLogger, + ); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('goodbye', { + data: '{"reason": "Bye"}', + }); + + expect(receivedPayloads.length).toEqual(0); + expect(mockLogger.info).toHaveBeenCalledWith( + 'Goodbye was received from the LaunchDarkly connection with reason: Bye.', + ); +}); + +it('logs prescribed message when error event is encountered', () => { + const mockLogger: LDLogger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader( + mockStream, + { + mockKind: (it) => it, // obj processor that just returns the same obj + }, + undefined, + mockLogger, + ); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('error', { + data: '{"reason": "Womp womp"}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(0); + expect(mockLogger.info).toHaveBeenCalledWith( + 'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.', + ); +}); + +it('discards partially transferred data when an error is encountered', () => { + const mockLogger: LDLogger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader( + mockStream, + { + mockKind: (it) => it, // obj processor that just returns the same obj + }, + undefined, + mockLogger, + ); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('error', { + data: '{"reason": "Womp womp"}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId2"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagX", "version": 123, "object": {"objectFieldX": "objectValueX"}}', + }); + mockStream.simulateEvent('delete-object', { + data: '{"kind": "mockKind", "key": "flagY", "version": 123}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagZ", "version": 123, "object": {"objectFieldZ": "objectValueZ"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState2", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId2'); + expect(receivedPayloads[0].state).toEqual('mockState2'); + expect(receivedPayloads[0].basis).toEqual(true); + expect(receivedPayloads[0].updates.length).toEqual(3); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' }); + expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined); + expect(receivedPayloads[0].updates[1].object).toEqual(undefined); + expect(receivedPayloads[0].updates[1].deleted).toEqual(true); + expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' }); + expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined); +}); + +it('silently ignores unrecognized kinds', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "ItsMeYourBrotherUnrecognizedKind", "key": "unrecognized", "version": 123, "object": {"unrecognized": "unrecognized"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(true); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' }); +}); + +it('ignores additional payloads beyond the first payload in the server-intent message', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"},{"code": "IShouldBeIgnored", "id": "IShouldBeIgnored"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "ItsMeYourBrotherUnrecognizedKind", "key": "unrecognized", "version": 123, "object": {"unrecognized": "unrecognized"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(true); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' }); +}); diff --git a/packages/shared/common/src/api/platform/EventSource.ts b/packages/shared/common/src/api/platform/EventSource.ts index 55bef9bfe..f44fc830b 100644 --- a/packages/shared/common/src/api/platform/EventSource.ts +++ b/packages/shared/common/src/api/platform/EventSource.ts @@ -1,6 +1,6 @@ import type { HttpErrorResponse } from './Requests'; -export type EventName = 'delete' | 'patch' | 'ping' | 'put'; +export type EventName = string; export type EventListener = (event?: { data?: any }) => void; export type ProcessStreamResponse = { deserializeData: (data: string) => any; diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts new file mode 100644 index 000000000..4c4a88773 --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -0,0 +1,3 @@ +import { Payload, PayloadListener, PayloadReader, Update } from './payloadReader'; + +export { Payload, PayloadListener, PayloadReader, Update }; diff --git a/packages/shared/common/src/internal/fdv2/payloadReader.ts b/packages/shared/common/src/internal/fdv2/payloadReader.ts new file mode 100644 index 000000000..68b1320fc --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/payloadReader.ts @@ -0,0 +1,219 @@ +/* eslint-disable no-underscore-dangle */ +import { EventListener, EventName, LDLogger } from '../../api'; +import { DataSourceErrorKind } from '../../datasource'; +import { DeleteObject, PayloadTransferred, PutObject, ServerIntentData } from './proto'; + +// Facade interface to contain only ability to add event listeners +export interface EventStream { + addEventListener(type: EventName, listener: EventListener): void; +} + +// Used to define object processing between deserialization and payload listener invocation. This can be +// used provide object sanitization logic. +export interface ObjProcessors { + [kind: string]: (object: any) => any; +} + +// Represents information for one keyed object. +export interface Update { + kind: string; + key: string; + version: number; + object?: any; + deleted?: boolean; +} + +// Represents a collection of updates from the FDv2 services. If basis is true, the set of updates represents the +// complete state of the payload. +export interface Payload { + id: string; + version: number; + state: string; + basis: boolean; + updates: Update[]; +} + +export type PayloadListener = (payload: Payload) => void; + +/** + * A FDv2 PayloadReader can be used to parse payloads from a stream of FDv2 events. It will send payloads + * to the PayloadListeners as the payloads are received. Invalid series of events may be dropped silently, + * but the payload reader will continue to operate. + */ +export class PayloadReader { + private _listeners: PayloadListener[] = []; + + private _tempId?: string = undefined; + private _tempBasis?: boolean = undefined; + private _tempUpdates: Update[] = []; + + /** + * Creates a PayloadReader + * + * @param eventStream event stream of FDv2 events + * @param _objProcessors defines object processors for each object kind. + * @param _errorHandler that will be called with errors as they are encountered + * @param _logger for logging + */ + constructor( + eventStream: EventStream, + private readonly _objProcessors: ObjProcessors, + private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void, + private readonly _logger?: LDLogger, + ) { + this._attachHandler(eventStream, 'server-intent', this._processServerIntent); + this._attachHandler(eventStream, 'put-object', this._processPutObject); + this._attachHandler(eventStream, 'delete-object', this._processDeleteObject); + this._attachHandler(eventStream, 'payload-transferred', this._processPayloadTransferred); + this._attachHandler(eventStream, 'goodbye', this._processGoodbye); + this._attachHandler(eventStream, 'error', this._processError); + } + + addPayloadListener(listener: PayloadListener) { + this._listeners.push(listener); + } + + removePayloadListener(listener: PayloadListener) { + const index = this._listeners.indexOf(listener, 0); + if (index > -1) { + this._listeners.splice(index, 1); + } + } + + private _attachHandler(stream: EventStream, eventName: string, processor: (obj: any) => void) { + stream.addEventListener(eventName, async (event?: { data?: string }) => { + if (event?.data) { + this._logger?.debug(`Received ${eventName} event. Data is ${event.data}`); + try { + processor(JSON.parse(event.data)); + } catch { + this._logger?.error( + `Stream received data that was unable to be processed in "${eventName}" message`, + ); + this._logger?.debug(`Data follows: ${event.data}`); + this._errorHandler?.(DataSourceErrorKind.InvalidData, 'Malformed data in event stream'); + } + } else { + this._errorHandler?.(DataSourceErrorKind.Unknown, 'Unexpected message from event stream'); + } + }); + } + + private _processObj(kind: string, jsonObj: any): any { + return this._objProcessors[kind]?.(jsonObj); + } + + private _processServerIntent = (data: ServerIntentData) => { + // clear state in prep for handling data + this._resetState(); + + // if there's no payloads, return + if (!data.payloads.length) { + return; + } + // at the time of writing this, it was agreed upon that SDKs could assume exactly 1 element in this list. In the future, a negotiation of protocol version will be required to remove this assumption. + const payload = data.payloads[0]; + + switch (payload?.code) { + case 'xfer-full': + this._tempBasis = true; + break; + case 'xfer-changes': + case 'none': + this._tempBasis = false; + break; + default: + // unrecognized intent code, return + return; + } + + this._tempId = payload?.id; + }; + + private _processPutObject = (data: PutObject) => { + // if the following properties haven't been provided by now, we should ignore the event + if ( + !this._tempId || // server intent hasn't been recieved yet. + !data.kind || + !data.key || + !data.version || + !data.object + ) { + return; + } + + const obj = this._processObj(data.kind, data.object); + if (!obj) { + this._logger?.warn(`Unable to prcoess object for kind: '${data.kind}'`); + // ignore unrecognized kinds + return; + } + + this._tempUpdates.push({ + kind: data.kind, + key: data.key, + version: data.version, + object: obj, + // intentionally omit deleted for this put + }); + }; + + private _processDeleteObject = (data: DeleteObject) => { + // if the following properties haven't been provided by now, we should ignore the event + if (!this._tempId || !data.kind || !data.key || !data.version) { + return; + } + + this._tempUpdates.push({ + kind: data.kind, + key: data.key, + version: data.version, + // intentionally omit object for this delete + deleted: true, + }); + }; + + private _processPayloadTransferred = (data: PayloadTransferred) => { + // if the following properties haven't been provided by now, we should reset + if ( + !this._tempId || // server intent hasn't been recieved yet. + !data.state || + !data.version || + this._tempBasis === undefined + ) { + this._resetState(); // a reset is best defensive action since payload transferred terminates a payload + return; + } + + const payload: Payload = { + id: this._tempId!, + version: data.version, + state: data.state, + basis: this._tempBasis, + updates: this._tempUpdates, + }; + + this._listeners.forEach((it) => it(payload)); + this._resetState(); + }; + + private _processGoodbye = (data: any) => { + this._logger?.info( + `Goodbye was received from the LaunchDarkly connection with reason: ${data.reason}.`, + ); + this._resetState(); + }; + + private _processError = (data: any) => { + this._logger?.info( + `An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}. Automatic retry will occur.`, + ); + this._resetState(); + }; + + private _resetState() { + this._tempId = undefined; + this._tempBasis = undefined; + this._tempUpdates = []; + } +} diff --git a/packages/shared/common/src/internal/fdv2/proto.ts b/packages/shared/common/src/internal/fdv2/proto.ts new file mode 100644 index 000000000..2500a7e0f --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/proto.ts @@ -0,0 +1,35 @@ +export interface Event { + event: string; + data: any; +} + +export interface ServerIntentData { + payloads: PayloadIntent[]; +} + +export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; + +export interface PayloadIntent { + id: string; + target: number; + code: IntentCode; + reason: string; +} + +export interface PutObject { + kind: string; + key: string; + version: number; + object: any; +} + +export interface DeleteObject { + kind: string; + key: string; + version: number; +} + +export interface PayloadTransferred { + state: string; + version: number; +} diff --git a/packages/shared/common/src/internal/index.ts b/packages/shared/common/src/internal/index.ts index 27abb7993..282da8f91 100644 --- a/packages/shared/common/src/internal/index.ts +++ b/packages/shared/common/src/internal/index.ts @@ -2,3 +2,4 @@ export * from './context'; export * from './diagnostics'; export * from './evaluation'; export * from './events'; +export * from './fdv2'; diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts new file mode 100644 index 000000000..c30a91c44 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts @@ -0,0 +1,310 @@ +import { + DataSourceErrorKind, + defaultHeaders, + Info, + internal, + LDLogger, + LDStreamingError, + subsystem, +} from '@launchdarkly/js-sdk-common'; + +import StreamingProcessorFDv2 from '../../src/data_sources/StreamingProcessorFDv2'; +import { createBasicPlatform } from '../createBasicPlatform'; + +let logger: LDLogger; + +const serviceEndpoints = { + events: '', + polling: '', + streaming: 'https://mockstream.ld.com', + diagnosticEventPath: '/diagnostic', + analyticsEventPath: '/bulk', + includeAuthorizationHeader: true, +}; + +function getBasicConfiguration(inLogger: LDLogger) { + return { + sdkKey: 'testSdkKey', + serviceEndpoints, + logger: inLogger, + }; +} + +const dateNowString = '2023-08-10'; +const sdkKey = 'my-sdk-key'; +const events = { + 'server-intent': { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }, + 'put-object': { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }, + 'payload-transferred': { + data: '{"state": "mockState", "version": 1}', + }, +}; + +let basicPlatform: any; + +beforeEach(() => { + basicPlatform = createBasicPlatform(); + logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; +}); + +const createMockEventSource = (streamUri: string = '', options: any = {}) => ({ + streamUri, + options, + onclose: jest.fn(), + addEventListener: jest.fn(), + close: jest.fn(), +}); + +describe('given a stream processor with mock event source', () => { + let info: Info; + let streamingProcessor: subsystem.LDStreamProcessor; + let diagnosticsManager: internal.DiagnosticsManager; + let listener: internal.PayloadListener; + let mockEventSource: any; + let mockErrorHandler: jest.Mock; + let simulateEvents: (e?: any) => void; + let simulateError: (e: { status: number; message: string }) => boolean; + + beforeAll(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date(dateNowString)); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + beforeEach(() => { + mockErrorHandler = jest.fn(); + + info = basicPlatform.info; + + basicPlatform.requests = { + createEventSource: jest.fn((streamUri: string, options: any) => { + mockEventSource = createMockEventSource(streamUri, options); + return mockEventSource; + }), + } as any; + simulateEvents = (e: any = events) => { + mockEventSource.addEventListener.mock.calls[0][1](e['server-intent']); // server intent listener + mockEventSource.addEventListener.mock.calls[1][1](e['put-object']); // put listener + mockEventSource.addEventListener.mock.calls[3][1](e['payload-transferred']); // payload transferred listener + }; + simulateError = (e: { status: number; message: string }): boolean => + mockEventSource.options.errorFilter(e); + + listener = jest.fn(); + + diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {}); + streamingProcessor = new StreamingProcessorFDv2( + { + basicConfiguration: getBasicConfiguration(logger), + platform: basicPlatform, + }, + '/all', + [], + listener, + { + authorization: 'my-sdk-key', + 'user-agent': 'TestUserAgent/2.0.2', + 'x-launchdarkly-wrapper': 'Rapper/1.2.3', + }, + diagnosticsManager, + mockErrorHandler, + ); + + jest.spyOn(streamingProcessor, 'stop'); + streamingProcessor.start(); + }); + + afterEach(() => { + streamingProcessor.close(); + jest.resetAllMocks(); + }); + + it('uses expected uri and eventSource init args', () => { + expect(basicPlatform.requests.createEventSource).toBeCalledWith( + `${serviceEndpoints.streaming}/all`, + { + errorFilter: expect.any(Function), + headers: defaultHeaders(sdkKey, info, undefined), + initialRetryDelayMillis: 1000, + readTimeoutMillis: 300000, + retryResetIntervalMillis: 60000, + }, + ); + }); + + it('sets streamInitialReconnectDelay correctly', () => { + streamingProcessor = new StreamingProcessorFDv2( + { + basicConfiguration: getBasicConfiguration(logger), + platform: basicPlatform, + }, + '/all', + [], + listener, + { + authorization: 'my-sdk-key', + 'user-agent': 'TestUserAgent/2.0.2', + 'x-launchdarkly-wrapper': 'Rapper/1.2.3', + }, + diagnosticsManager, + mockErrorHandler, + 22, + ); + streamingProcessor.start(); + + expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( + `${serviceEndpoints.streaming}/all`, + { + errorFilter: expect.any(Function), + headers: defaultHeaders(sdkKey, info, undefined), + initialRetryDelayMillis: 22000, + readTimeoutMillis: 300000, + retryResetIntervalMillis: 60000, + }, + ); + }); + + it('adds listeners', () => { + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 1, + 'server-intent', + expect.any(Function), + ); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 2, + 'put-object', + expect.any(Function), + ); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 3, + 'delete-object', + expect.any(Function), + ); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 4, + 'payload-transferred', + expect.any(Function), + ); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 5, + 'goodbye', + expect.any(Function), + ); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 6, + 'error', + expect.any(Function), + ); + }); + + it('executes payload listener', () => { + simulateEvents(); + expect(listener).toHaveBeenCalled(); + }); + + it('passes error to callback if json data is malformed', async () => { + simulateEvents({ + 'server-intent': { + data: '{"payloads": [{"intent INTENTIONAL CORRUPTION MUWAHAHAHA', + }, + }); + + expect(mockErrorHandler.mock.calls[0][0].kind).toEqual(DataSourceErrorKind.InvalidData); + expect(mockErrorHandler.mock.calls[0][0].message).toEqual('Malformed data in event stream'); + }); + + it('calls error handler if event.data prop is missing', async () => { + simulateEvents({ + 'server-intent': { + notData: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', + }, + 'put-object': { + notData: + '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }, + 'payload-transferred': { + notData: '{"state": "mockState", "version": 1}', + }, + }); + expect(listener).not.toHaveBeenCalled(); + expect(mockErrorHandler.mock.calls[0][0].kind).toEqual(DataSourceErrorKind.Unknown); + expect(mockErrorHandler.mock.calls[0][0].message).toMatch(/unexpected message/i); + }); + + it('closes and stops', async () => { + streamingProcessor.close(); + + expect(streamingProcessor.stop).toBeCalled(); + expect(mockEventSource.close).toBeCalled(); + // @ts-ignore + expect(streamingProcessor.eventSource).toBeUndefined(); + }); + + it('creates a stream init event', async () => { + const startTime = Date.now(); + simulateEvents(); + + const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0); + expect(diagnosticEvent.streamInits.length).toEqual(1); + const si = diagnosticEvent.streamInits[0]; + expect(si.timestamp).toEqual(startTime); + expect(si.failed).toBeFalsy(); + expect(si.durationMillis).toBeGreaterThanOrEqual(0); + }); + + describe.each([400, 408, 429, 500, 503])('given recoverable http errors', (status) => { + it(`continues retrying after error: ${status}`, () => { + const startTime = Date.now(); + const testError = { status, message: 'retry. recoverable.' }; + const willRetry = simulateError(testError); + + expect(willRetry).toBeTruthy(); + expect(mockErrorHandler).not.toBeCalled(); + expect(logger.warn).toBeCalledWith( + expect.stringMatching(new RegExp(`${status}.*will retry`)), + ); + + const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0); + expect(diagnosticEvent.streamInits.length).toEqual(1); + const si = diagnosticEvent.streamInits[0]; + expect(si.timestamp).toEqual(startTime); + expect(si.failed).toBeTruthy(); + expect(si.durationMillis).toBeGreaterThanOrEqual(0); + }); + }); + + describe.each([401, 403])('given irrecoverable http errors', (status) => { + it(`stops retrying after error: ${status}`, () => { + const startTime = Date.now(); + const testError = { status, message: 'stopping. irrecoverable.' }; + const willRetry = simulateError(testError); + + expect(willRetry).toBeFalsy(); + expect(mockErrorHandler).toBeCalledWith( + new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status), + ); + expect(logger.error).toBeCalledWith( + expect.stringMatching(new RegExp(`${status}.*permanently`)), + ); + + const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0); + expect(diagnosticEvent.streamInits.length).toEqual(1); + const si = diagnosticEvent.streamInits[0]; + expect(si.timestamp).toEqual(startTime); + expect(si.failed).toBeTruthy(); + expect(si.durationMillis).toBeGreaterThanOrEqual(0); + }); + }); +}); diff --git a/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts new file mode 100644 index 000000000..967eb658f --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts @@ -0,0 +1,142 @@ +import { LDLogger } from '@launchdarkly/js-sdk-common'; + +import { LDDataSourceUpdates } from '../../src/api/subsystems'; +import { createPayloadListener } from '../../src/data_sources/createPayloadListenerFDv2'; + +jest.mock('../../src/store/serialization'); + +let logger: LDLogger; + +beforeEach(() => { + logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; +}); + +const fullTransferPayload = { + id: 'payloadID', + version: 99, + state: 'initial', + basis: true, + updates: [ + { + kind: 'flag', + key: 'flagkey', + version: 1, + object: { + key: 'flagkey', + version: 1, + }, + }, + { + kind: 'segment', + key: 'segkey', + version: 1, + object: { + key: 'segkey', + version: 2, + }, + }, + ], +}; + +const changesTransferPayload = { + id: 'payloadID', + version: 99, + state: 'changes', + basis: false, + updates: [ + { + kind: 'flag', + key: 'flagkey', + version: 1, + object: { + key: 'flagkey', + version: 1, + }, + }, + { + kind: 'segment', + key: 'segkey', + version: 2, + object: { + key: 'segkey', + version: 2, + }, + }, + { + kind: 'flag', + key: 'deletedFlag', + version: 3, + object: { + key: 'deletedFlag', + version: 3, + }, + deleted: true, + }, + ], +}; + +describe('createPayloadListenerFDv2', () => { + let dataSourceUpdates: LDDataSourceUpdates; + let basisRecieved: jest.Mock; + + beforeEach(() => { + dataSourceUpdates = { + init: jest.fn(), + upsert: jest.fn(), + }; + basisRecieved = jest.fn(); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + test('data source init is called', async () => { + const listener = createPayloadListener(dataSourceUpdates, logger, basisRecieved); + listener(fullTransferPayload); + + expect(logger.debug).toBeCalledWith(expect.stringMatching(/initializing/i)); + expect(dataSourceUpdates.init).toBeCalledWith( + { + features: { + flagkey: { key: 'flagkey', version: 1 }, + }, + segments: { + segkey: { key: 'segkey', version: 2 }, + }, + }, + basisRecieved, + ); + }); + + test('data source upsert is called', async () => { + const listener = createPayloadListener(dataSourceUpdates, logger, basisRecieved); + listener(changesTransferPayload); + + expect(logger.debug).toBeCalledWith(expect.stringMatching(/updating/i)); + expect(dataSourceUpdates.upsert).toHaveBeenCalledTimes(3); + expect(dataSourceUpdates.upsert).toHaveBeenNthCalledWith( + 1, + { namespace: 'features' }, + { key: 'flagkey', version: 1 }, + expect.anything(), + ); + expect(dataSourceUpdates.upsert).toHaveBeenNthCalledWith( + 2, + { namespace: 'segments' }, + { key: 'segkey', version: 2 }, + expect.anything(), + ); + expect(dataSourceUpdates.upsert).toHaveBeenNthCalledWith( + 3, + { namespace: 'features' }, + { key: 'deletedFlag', version: 3, deleted: true }, + expect.anything(), + ); + }); +}); diff --git a/packages/shared/sdk-server/src/data_sources/createStreamListeners.test.ts b/packages/shared/sdk-server/__tests__/data_sources/createStreamListeners.test.ts similarity index 95% rename from packages/shared/sdk-server/src/data_sources/createStreamListeners.test.ts rename to packages/shared/sdk-server/__tests__/data_sources/createStreamListeners.test.ts index 02df75aad..3237b9417 100644 --- a/packages/shared/sdk-server/src/data_sources/createStreamListeners.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/createStreamListeners.test.ts @@ -1,11 +1,11 @@ import { LDLogger } from '@launchdarkly/js-sdk-common'; -import { LDDataSourceUpdates } from '../api/subsystems'; -import { deserializeAll, deserializeDelete, deserializePatch } from '../store/serialization'; -import VersionedDataKinds from '../store/VersionedDataKinds'; -import { createStreamListeners } from './createStreamListeners'; +import { LDDataSourceUpdates } from '../../src/api/subsystems'; +import { createStreamListeners } from '../../src/data_sources/createStreamListeners'; +import { deserializeAll, deserializeDelete, deserializePatch } from '../../src/store/serialization'; +import VersionedDataKinds from '../../src/store/VersionedDataKinds'; -jest.mock('../store/serialization'); +jest.mock('../../src/store/serialization'); let logger: LDLogger; diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts new file mode 100644 index 000000000..901c3d3ec --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -0,0 +1,158 @@ +import { + ClientContext, + DataSourceErrorKind, + EventSource, + getStreamingUri, + httpErrorMessage, + HttpErrorResponse, + internal, + LDHeaders, + LDLogger, + LDStreamingError, + Requests, + shouldRetry, + StreamingErrorHandler, + subsystem, +} from '@launchdarkly/js-sdk-common'; +import { PayloadListener } from '@launchdarkly/js-sdk-common/dist/esm/internal'; + +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; + +// TODO: consider naming this StreamingDatasource +export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcessor { + private readonly _headers: { [key: string]: string | string[] }; + private readonly _streamUri: string; + private readonly _logger?: LDLogger; + + private _eventSource?: EventSource; + private _requests: Requests; + private _connectionAttemptStartTime?: number; + + constructor( + clientContext: ClientContext, + streamUriPath: string, + parameters: { key: string; value: string }[], + private readonly _payloadListener: PayloadListener, + baseHeaders: LDHeaders, + private readonly _diagnosticsManager?: internal.DiagnosticsManager, + private readonly _errorHandler?: StreamingErrorHandler, + private readonly _streamInitialReconnectDelay = 1, + ) { + const { basicConfiguration, platform } = clientContext; + const { logger } = basicConfiguration; + const { requests } = platform; + + this._headers = { ...baseHeaders }; + this._logger = logger; + this._requests = requests; + this._streamUri = getStreamingUri( + basicConfiguration.serviceEndpoints, + streamUriPath, + parameters, + ); + } + + private _logConnectionStarted() { + this._connectionAttemptStartTime = Date.now(); + } + + private _logConnectionResult(success: boolean) { + if (this._connectionAttemptStartTime && this._diagnosticsManager) { + this._diagnosticsManager.recordStreamInit( + this._connectionAttemptStartTime, + !success, + Date.now() - this._connectionAttemptStartTime, + ); + } + + this._connectionAttemptStartTime = undefined; + } + + /** + * This is a wrapper around the passed errorHandler which adds additional + * diagnostics and logging logic. + * + * @param err The error to be logged and handled. + * @return boolean whether to retry the connection. + * + * @private + */ + private _retryAndHandleError(err: HttpErrorResponse) { + if (!shouldRetry(err)) { + this._logConnectionResult(false); + this._errorHandler?.( + new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status), + ); + this._logger?.error(httpErrorMessage(err, 'streaming request')); + return false; + } + + this._logger?.warn(httpErrorMessage(err, 'streaming request', 'will retry')); + this._logConnectionResult(false); + this._logConnectionStarted(); + return true; + } + + start() { + this._logConnectionStarted(); + + const eventSource = this._requests.createEventSource(this._streamUri, { + headers: this._headers, + errorFilter: (error: HttpErrorResponse) => this._retryAndHandleError(error), + initialRetryDelayMillis: 1000 * this._streamInitialReconnectDelay, + readTimeoutMillis: 5 * 60 * 1000, + retryResetIntervalMillis: 60 * 1000, + }); + this._eventSource = eventSource; + const payloadReader = new internal.PayloadReader( + eventSource, + { + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + this._errorHandler?.(new LDStreamingError(errorKind, message)); + }, + this._logger, + ); + payloadReader.addPayloadListener(() => { + // TODO: discuss if it is satisfactory to switch from setting connection result on single event to getting a payload. Need + // to double check the handling in the ServerIntent:none case. That may not trigger these payload listeners. + this._logConnectionResult(true); + }); + payloadReader.addPayloadListener(this._payloadListener); + + eventSource.onclose = () => { + this._logger?.info('Closed LaunchDarkly stream connection'); + }; + + eventSource.onerror = () => { + // The work is done by `errorFilter`. + }; + + eventSource.onopen = () => { + this._logger?.info('Opened LaunchDarkly stream connection'); + }; + + eventSource.onretrying = (e) => { + this._logger?.info(`Will retry stream connection in ${e.delayMillis} milliseconds`); + }; + } + + stop() { + this._eventSource?.close(); + this._eventSource = undefined; + } + + close() { + this.stop(); + } +} diff --git a/packages/shared/sdk-server/src/data_sources/createPayloadListenerFDv2.ts b/packages/shared/sdk-server/src/data_sources/createPayloadListenerFDv2.ts new file mode 100644 index 000000000..1b03bbf0f --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/createPayloadListenerFDv2.ts @@ -0,0 +1,76 @@ +import { internal, LDLogger, VoidFunction } from '@launchdarkly/js-sdk-common'; + +import { + LDDataSourceUpdates, + LDFeatureStoreDataStorage, + LDKeyedFeatureStoreItem, +} from '../api/subsystems'; + +const namespaceForKind = (kind: string) => { + switch (kind) { + case 'flag': + return 'features'; + case 'segment': + return 'segments'; + default: + return kind; + } +}; + +export const createPayloadListener = + ( + dataSourceUpdates: LDDataSourceUpdates, + logger?: LDLogger, + basisReceived: VoidFunction = () => {}, + ) => + (payload: internal.Payload) => { + // This conversion from FDv2 updates to the existing types used with DataSourceUpdates should be temporary. Eventually + // DataSourceUpdates will support update(...) taking in the list of updates. + if (payload.basis) { + // convert basis to init param structure + // TODO: SDK-850 - remove conversion as part of FDv2 Persistence work + const converted: LDFeatureStoreDataStorage = {}; + payload.updates.forEach((it: internal.Update) => { + const namespace = namespaceForKind(it.kind); + if (converted[namespace]) { + // entry for kind already exists, add key + converted[namespace][it.key] = { + version: it.version, + deleted: it.deleted, + ...it.object, + }; + } else { + // entry for kind doesn't exist, add kind and key + converted[namespace] = { + [it.key]: { + version: it.version, + deleted: it.deleted, + ...it.object, + }, + }; + } + }); + + logger?.debug('Initializing all data'); + dataSourceUpdates.init(converted, basisReceived); + } else { + // convert data to upsert param + // TODO: SDK-850 - remove conversion as part of FDv2 Persistence work + payload.updates.forEach((it: internal.Update) => { + const converted: LDKeyedFeatureStoreItem = { + key: it.key, + version: it.version, + deleted: it.deleted, + ...it.object, + }; + + if (it.deleted) { + logger?.debug(`Deleting ${it.key} in ${it.kind}`); + } else { + logger?.debug(`Updating ${it.key} in ${it.kind}`); + } + + dataSourceUpdates.upsert({ namespace: namespaceForKind(it.kind) }, converted, () => {}); + }); + } + };