Skip to content

Commit

Permalink
feat: support sync plugin with subscribe and no get
Browse files Browse the repository at this point in the history
  • Loading branch information
jmeistrich committed Sep 28, 2024
1 parent 1e63c79 commit e8d409c
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 91 deletions.
197 changes: 106 additions & 91 deletions src/sync/syncObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,9 @@ export function syncObservable<T>(
}
};

if (syncOptions.get) {
const { get, subscribe } = syncOptions;

if (get || subscribe) {
sync = async () => {
// If this node is not being observed or sync is not enabled then don't sync
if (isSynced && (!getNodeValue(getNode(syncState$)).isSyncEnabled || shouldIgnoreUnobserved(node, sync))) {
Expand All @@ -989,9 +991,8 @@ export function syncObservable<T>(
}
const lastSync = metadatas.get(obs$)?.lastSync;
const pending = localState.pendingChanges;
const get = syncOptions.get;

if (get) {
if (get || subscribe) {
const { waitFor } = syncOptions;

const runGet = () => {
Expand Down Expand Up @@ -1108,12 +1109,24 @@ export function syncObservable<T>(
value$: obs$,
lastSync,
update: (params: UpdateFnParams) => {
when(syncState$.isLoaded, () => {
when(waitFor || true, () => {
params.mode ||= syncOptions.mode || 'merge';
onChange(params);
});
});
when(
() => !get || syncState$.isLoaded.get(),
() => {
when(waitFor || true, () => {
params.mode ||= syncOptions.mode || 'merge';
onChange(params);

// If no get then we need to set the loaded state
if (!syncState$.isLoaded.peek()) {
syncState$.assign({
isLoaded: syncStateValue.numPendingRemoteLoads! < 1,
error: undefined,
isGetting: syncStateValue.numPendingGets! > 0,
});
}
});
},
);
},
refresh: () => when(syncState$.isLoaded, sync),
onError: (error: Error) => onGetError(error, { source: 'subscribe', subscribeParams }),
Expand All @@ -1129,94 +1142,96 @@ export function syncObservable<T>(
}
const existingValue = getNodeValue(node);

const onError = (error: Error) => onGetError(error, { getParams, source: 'get' });

const getParams: SyncedGetParams<T> = {
node,
value$: obs$,
value: isFunction(existingValue) || existingValue?.[symbolLinked] ? undefined : existingValue,
mode: syncOptions.mode!,
refresh: sync,
options: syncOptions,
lastSync,
updateLastSync: (lastSync: number) => (getParams.lastSync = lastSync),
onError,
retryNum: 0,
cancelRetry: false,
};

let modeBeforeReset: GetMode | undefined = undefined;
if (get) {
const onError = (error: Error) => onGetError(error, { getParams, source: 'get' });
const getParams: SyncedGetParams<T> = {
node,
value$: obs$,
value:
isFunction(existingValue) || existingValue?.[symbolLinked] ? undefined : existingValue,
mode: syncOptions.mode!,
refresh: sync,
options: syncOptions,
lastSync,
updateLastSync: (lastSync: number) => (getParams.lastSync = lastSync),
onError,
retryNum: 0,
cancelRetry: false,
};

const beforeGetParams: Parameters<Required<SyncedOptions<any>>['onBeforeGet']>[0] = {
value: getParams.value,
lastSync,
pendingChanges: pending && !isEmpty(pending) ? pending : undefined,
clearPendingChanges: async () => {
localState.pendingChanges = {};
await updateMetadataImmediate(obs$, localState, syncState$, syncOptions, {
pending: localState.pendingChanges,
});
},
resetCache: () => {
modeBeforeReset = getParams.mode;
getParams.mode = 'set';
return syncStateValue.resetPersistence?.();
},
cancel: false,
};
let modeBeforeReset: GetMode | undefined = undefined;

syncOptions.onBeforeGet?.(beforeGetParams);

if (!beforeGetParams.cancel) {
syncState$.assign({
numPendingGets: (syncStateValue.numPendingGets! || 0) + 1,
isGetting: true,
});
const got = runWithRetry(
getParams,
syncOptions.retry,
(retryEvent) => {
const params = getParams as SyncedGetParams<T>;
params.cancelRetry = retryEvent.cancelRetry;
params.retryNum = retryEvent.retryNum;
return get(params);
},
onError,
);
const numGets = (node.numGets = (node.numGets || 0) + 1);
const handle = (value: any) => {
syncState$.numPendingGets.set((v) => v! - 1);
if (isWaitingForLoad) {
isWaitingForLoad = false;
syncStateValue.numPendingRemoteLoads!--;
}
// If this is from an older Promise than one that resolved already,
// ignore it as the newer one wins
if (numGets >= (node.getNumResolved || 0)) {
node.getNumResolved = node.numGets;

onChange({
value,
lastSync: getParams.lastSync,
mode: getParams.mode!,
const beforeGetParams: Parameters<Required<SyncedOptions<any>>['onBeforeGet']>[0] = {
value: getParams.value,
lastSync,
pendingChanges: pending && !isEmpty(pending) ? pending : undefined,
clearPendingChanges: async () => {
localState.pendingChanges = {};
await updateMetadataImmediate(obs$, localState, syncState$, syncOptions, {
pending: localState.pendingChanges,
});
}
},
resetCache: () => {
modeBeforeReset = getParams.mode;
getParams.mode = 'set';
return syncStateValue.resetPersistence?.();
},
cancel: false,
};

if (modeBeforeReset) {
getParams.mode = modeBeforeReset;
modeBeforeReset = undefined;
}
syncOptions.onBeforeGet?.(beforeGetParams);

if (!beforeGetParams.cancel) {
syncState$.assign({
isLoaded: syncStateValue.numPendingRemoteLoads! < 1,
error: undefined,
isGetting: syncStateValue.numPendingGets! > 0,
numPendingGets: (syncStateValue.numPendingGets! || 0) + 1,
isGetting: true,
});
};
if (isPromise(got)) {
got.then(handle).catch(onError);
} else {
handle(got);
const got = runWithRetry(
getParams,
syncOptions.retry,
(retryEvent) => {
const params = getParams as SyncedGetParams<T>;
params.cancelRetry = retryEvent.cancelRetry;
params.retryNum = retryEvent.retryNum;
return get(params);
},
onError,
);
const numGets = (node.numGets = (node.numGets || 0) + 1);
const handle = (value: any) => {
syncState$.numPendingGets.set((v) => v! - 1);
if (isWaitingForLoad) {
isWaitingForLoad = false;
syncStateValue.numPendingRemoteLoads!--;
}
// If this is from an older Promise than one that resolved already,
// ignore it as the newer one wins
if (numGets >= (node.getNumResolved || 0)) {
node.getNumResolved = node.numGets;

onChange({
value,
lastSync: getParams.lastSync,
mode: getParams.mode!,
});
}

if (modeBeforeReset) {
getParams.mode = modeBeforeReset;
modeBeforeReset = undefined;
}

syncState$.assign({
isLoaded: syncStateValue.numPendingRemoteLoads! < 1,
error: undefined,
isGetting: syncStateValue.numPendingGets! > 0,
});
};
if (isPromise(got)) {
got.then(handle).catch(onError);
} else {
handle(got);
}
}
}
};
Expand Down Expand Up @@ -1296,7 +1311,7 @@ export function syncObservable<T>(
// When all is loaded locally we can start syncing and listening for changes
when(onAllPersistLoaded, function (this: any) {
// If remote is not manual, then sync() is called automatically
if (syncOptions.get && syncOptions.syncMode === 'auto') {
if ((syncOptions.get || syncOptions.subscribe) && syncOptions.syncMode === 'auto') {
sync();
}

Expand Down
30 changes: 30 additions & 0 deletions tests/crud.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,36 @@ describe('subscribe', () => {
await promiseTimeout(1);
expect(obs.get()).toEqual({ 2: { id: 2 } });
});
test('subscribe with no list', async () => {
const obs = observable(
syncedCrud({
as: 'object',
subscribe: ({ update }) => {
update({ value: [{ id: 1 }] });
},
}),
);

expect(obs.get()).toEqual({ 1: { id: 1 } });
});
test('subscribe with no list async', async () => {
const obs = observable(
syncedCrud({
as: 'object',
subscribe: ({ update }) => {
setTimeout(() => {
update({ value: [{ id: 1 }] });
}, 0);
},
}),
);

expect(obs.get()).toEqual(undefined);

await promiseTimeout(1);

expect(obs.get()).toEqual({ 1: { id: 1 } });
});
});
describe('onSaved', () => {
test('without onSaved updates with id', async () => {
Expand Down

0 comments on commit e8d409c

Please sign in to comment.