Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: preparation work for FDv2 feature store and persistence support #725

Open
wants to merge 6 commits into
base: ta/fdv2-temporary-holding
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ export class EdgeFeatureStore implements LDFeatureStore {
}

init(allData: LDFeatureStoreDataStorage, callback: () => void): void {
this.applyChanges(true, allData, undefined, callback);
}

applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined,
callback: () => void,
): void {
callback();
}

Expand Down
9 changes: 9 additions & 0 deletions packages/shared/sdk-server-edge/src/api/EdgeFeatureStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ export class EdgeFeatureStore implements LDFeatureStore {
}

init(allData: LDFeatureStoreDataStorage, callback: () => void): void {
this.applyChanges(true, allData, undefined, callback);
}

applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined,
callback: () => void,
): void {
callback();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const mockFeatureStore: LDFeatureStore = {
init: jest.fn(),
initialized: jest.fn(),
upsert: jest.fn(),
applyChanges: jest.fn(),
get: jest.fn(),
delete: jest.fn(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ describe('createPayloadListenerFDv2', () => {
dataSourceUpdates = {
init: jest.fn(),
upsert: jest.fn(),
applyChanges: jest.fn(),
};
basisRecieved = jest.fn();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ describe('createStreamListeners', () => {
dataSourceUpdates = {
init: jest.fn(),
upsert: jest.fn(),
applyChanges: jest.fn(),
};
onPutCompleteHandler = jest.fn();
onPatchCompleteHandler = jest.fn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,21 @@ export interface LDDataSourceUpdates {
* Will be called after the upsert operation is complete.
*/
upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void;

/**
* @param basis If true, completely overwrites the current contents of the data store
* with the provided data. If false, upserts the items in the provided data. Upserts
* are made only if provided items have newer versions than existing items.
* @param data An object in which each key is the "namespace" of a collection (e.g. `"features"`) and
* the value is an object that maps keys to entities. The actual type of this parameter is
* `interfaces.FullDataSet<VersionedData>`.
* @param selector TODO
* @param callback Will be called after the changes are applied.
*/
applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String,
callback: () => void,
): void;
}
17 changes: 17 additions & 0 deletions packages/shared/sdk-server/src/api/subsystems/LDFeatureStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ export interface LDFeatureStore {
*/
upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void;

/**
* @param basis If true, completely overwrites the current contents of the data store
* with the provided data. If false, upserts the items in the provided data. Upserts
* are made only if provided items have newer versions than existing items.
* @param data An object in which each key is the "namespace" of a collection (e.g. `"features"`) and
* the value is an object that maps keys to entities. The actual type of this parameter is
* `interfaces.FullDataSet<VersionedData>`.
* @param selector TODO
* @param callback Will be called after the changes are applied.
*/
applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined,
callback: () => void,
): void;

/**
* Tests whether the store is initialized.
*
Expand Down
86 changes: 45 additions & 41 deletions packages/shared/sdk-server/src/data_sources/DataSourceUpdates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,38 @@ export default class DataSourceUpdates implements LDDataSourceUpdates {
) {}

init(allData: LDFeatureStoreDataStorage, callback: () => void): void {
this.applyChanges(true, allData, undefined, callback); // basis is true for init. selector is undefined for FDv1 init
}

upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void {
this.applyChanges(
false, // basis is false for upserts
{
[kind.namespace]: {
[data.key]: data,
},
},
undefined, // selector is undefined for FDv1 upsert
callback,
);
}

applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined,
callback: () => void,
): void {
const checkForChanges = this._hasEventListeners();
const doInit = (oldData?: LDFeatureStoreDataStorage) => {
this._featureStore.init(allData, () => {
const doApplyChanges = (oldData: LDFeatureStoreDataStorage) => {
this._featureStore.applyChanges(basis, data, selector, () => {
// Defer change events so they execute after the callback.
Promise.resolve().then(() => {
this._dependencyTracker.reset();
if (basis) {
this._dependencyTracker.reset();
}

Object.entries(allData).forEach(([namespace, items]) => {
Object.entries(data).forEach(([namespace, items]) => {
Object.keys(items || {}).forEach((key) => {
const item = items[key];
this._dependencyTracker.updateDependenciesFrom(
Expand All @@ -87,11 +111,18 @@ export default class DataSourceUpdates implements LDDataSourceUpdates {

if (checkForChanges) {
const updatedItems = new NamespacedDataSet<boolean>();
Object.keys(allData).forEach((namespace) => {
const oldDataForKind = oldData?.[namespace] || {};
const newDataForKind = allData[namespace];
const mergedData = { ...oldDataForKind, ...newDataForKind };
Object.keys(mergedData).forEach((key) => {
Object.keys(data).forEach((namespace) => {
const oldDataForKind = oldData[namespace];
const newDataForKind = data[namespace];
let iterateData;
if (basis) {
// for basis, need to iterate on all keys
iterateData = { ...oldDataForKind, ...newDataForKind };
} else {
// for non basis, only need to iterate on keys in incoming data
iterateData = { ...newDataForKind };
}
Object.keys(iterateData).forEach((key) => {
this.addIfModified(
namespace,
key,
Expand All @@ -101,55 +132,28 @@ export default class DataSourceUpdates implements LDDataSourceUpdates {
);
});
});

this.sendChangeEvents(updatedItems);
}
});
callback?.();
});
};

let oldData = {};
if (checkForChanges) {
// record old data before making changes to use for change calculations
this._featureStore.all(VersionedDataKinds.Features, (oldFlags) => {
this._featureStore.all(VersionedDataKinds.Segments, (oldSegments) => {
const oldData = {
oldData = {
[VersionedDataKinds.Features.namespace]: oldFlags,
[VersionedDataKinds.Segments.namespace]: oldSegments,
};
doInit(oldData);
});
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: These all calls feel a bit off, but without a refactoring of layer responsibility, I don't see a change that is worth the effort. Thoughts?

} else {
doInit();
}
}

upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void {
const { key } = data;
const checkForChanges = this._hasEventListeners();
const doUpsert = (oldItem?: LDFeatureStoreItem | null) => {
this._featureStore.upsert(kind, data, () => {
// Defer change events so they execute after the callback.
Promise.resolve().then(() => {
this._dependencyTracker.updateDependenciesFrom(
kind.namespace,
key,
computeDependencies(kind.namespace, data),
);
if (checkForChanges) {
const updatedItems = new NamespacedDataSet<boolean>();
this.addIfModified(kind.namespace, key, oldItem, data, updatedItems);
this.sendChangeEvents(updatedItems);
}
});

callback?.();
});
};
if (checkForChanges) {
this._featureStore.get(kind, key, doUpsert);
} else {
doUpsert();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: upsert and init have been combined and the distinction is now driven by the bool basis.

doApplyChanges(oldData);
}

addIfModified(
Expand Down
77 changes: 54 additions & 23 deletions packages/shared/sdk-server/src/store/InMemoryFeatureStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,6 @@ export default class InMemoryFeatureStore implements LDFeatureStore {

private _initCalled = false;

private _addItem(kind: DataKind, key: string, item: LDFeatureStoreItem) {
let items = this._allData[kind.namespace];
if (!items) {
items = {};
this._allData[kind.namespace] = items;
}
if (Object.hasOwnProperty.call(items, key)) {
const old = items[key];
if (!old || old.version < item.version) {
items[key] = item;
}
} else {
items[key] = item;
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: This logic has moved into apply changes.


get(kind: DataKind, key: string, callback: (res: LDFeatureStoreItem | null) => void): void {
const items = this._allData[kind.namespace];
if (items) {
Expand All @@ -53,19 +37,66 @@ export default class InMemoryFeatureStore implements LDFeatureStore {
}

init(allData: LDFeatureStoreDataStorage, callback: () => void): void {
this._initCalled = true;
this._allData = allData as LDFeatureStoreDataStorage;
callback?.();
this.applyChanges(true, allData, undefined, callback);
}

delete(kind: DataKind, key: string, version: number, callback: () => void): void {
const deletedItem = { version, deleted: true };
this._addItem(kind, key, deletedItem);
callback?.();
const item: LDKeyedFeatureStoreItem = { key, version, deleted: true };
this.applyChanges(
false,
{
[kind.namespace]: {
[key]: item,
},
},
undefined,
callback,
);
}

upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void {
this._addItem(kind, data.key, data);
this.applyChanges(
false,
{
[kind.namespace]: {
[data.key]: data,
},
},
undefined,
callback,
);
}

applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined, // TODO handle selector
callback: () => void,
): void {
if (basis) {
this._initCalled = true;
this._allData = data;
} else {
Object.entries(data).forEach(([namespace, items]) => {
Object.keys(items || {}).forEach((key) => {
let existingItems = this._allData[namespace];
if (!existingItems) {
existingItems = {};
this._allData[namespace] = existingItems;
}
const item = items[key];
if (Object.hasOwnProperty.call(existingItems, key)) {
const old = existingItems[key];
if (!old || old.version < item.version) {
existingItems[key] = item;
}
} else {
existingItems[key] = item;
}
});
});
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: Guts of this logic comes from _addItem at the top of this file. Small tweaks from there.


callback?.();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,27 @@ export default class PersistentDataStoreWrapper implements LDFeatureStore {
this.upsert(kind, { key, version, deleted: true }, callback);
}

applyChanges(
_basis: boolean,
_data: LDFeatureStoreDataStorage,
_selector: String | undefined,
_callback: () => void,
): void {
// TODO: SDK-1029 - Transactional persistent store - update this to not iterate over items and instead send data to underlying PersistentDataStore
// no need for queue at the moment as init and upsert handle that, but as part of SDK-1029, queue may be needed
if (_basis) {
this.init(_data, _callback);
} else {
Object.entries(_data).forEach(([namespace, items]) => {
Object.keys(items || {}).forEach((key) => {
const item = items[key];
this.upsert({ namespace }, { key, ...item }, () => {});
});
});
_callback();
}
}
Copy link
Contributor Author

@tanderson-ld tanderson-ld Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: This code is not shippable (why this PR is going to a temp branch) due to the intermediate states potentially being invalid, but this temporary code allows contract tests not checking intermediate state to pass.


close(): void {
this._itemCache?.close();
this._allItemsCache?.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ export default class DynamoDBFeatureStore implements LDFeatureStore {
this._wrapper.upsert(kind, data, callback);
}

applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined,
callback: () => void,
): void {
this._wrapper.applyChanges(basis, data, selector, callback);
}

initialized(callback: (isInitialized: boolean) => void): void {
this._wrapper.initialized(callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ export default class RedisFeatureStore implements LDFeatureStore {
this._wrapper.upsert(kind, data, callback);
}

applyChanges(
basis: boolean,
data: LDFeatureStoreDataStorage,
selector: String | undefined,
callback: () => void,
): void {
this._wrapper.applyChanges(basis, data, selector, callback);
}

initialized(callback: (isInitialized: boolean) => void): void {
this._wrapper.initialized(callback);
}
Expand Down
Loading