Skip to content

Commit

Permalink
wip: task reschedule resumtion
Browse files Browse the repository at this point in the history
  • Loading branch information
amydevs committed May 17, 2024
1 parent dbf2363 commit 1b53a32
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 10 deletions.
15 changes: 8 additions & 7 deletions src/discovery/Discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ class Discovery {
}),
);
} catch (e) {
if (
e instanceof tasksErrors.ErrorTaskStop ||
e === discoveryStoppingTaskReason
) {
// We do not check for ErrorTaskStop because the
// below code relies on TaskManager being running
// to reschedule the vertex discovery task.
if (e === discoveryStoppingTaskReason) {
// We need to recreate the task for the vertex
const vertexId = gestaltsUtils.decodeGestaltId(vertex);
if (vertexId == null) never();
Expand Down Expand Up @@ -641,8 +641,9 @@ class Discovery {
lastProviderPaginationToken,
ctx,
);
lastProviderPaginationToken = lastProviderPaginationToken_;
const isAborted = ctx.signal.aborted;
lastProviderPaginationToken = lastProviderPaginationToken_;
const gestaltNodeIds: Array<GestaltId> = [];
// Link the identity with each node from its claims on the provider
// Iterate over each of the claims, even if ctx has aborted
for (const [claimId, claim] of Object.entries(identityClaims)) {
Expand All @@ -668,8 +669,8 @@ class Discovery {
},
},
);
// Add this vertex to the queue if it is not present
const gestaltNodeId: GestaltId = ['node', linkedVertexNodeId];
}
for (const gestaltNodeId of gestaltNodeIds) {
if (
!(await this.processedTimeGreaterThan(
gestaltNodeId,
Expand Down
89 changes: 88 additions & 1 deletion tests/discovery/Discovery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ describe('Discovery', () => {
await discovery.stop();
await discovery.destroy();
});
test('discovery persistence across restarts', async () => {
test('node discovery persistence across restarts', async () => {
const discovery = await Discovery.createDiscovery({
db,
keyRing,
Expand Down Expand Up @@ -491,6 +491,93 @@ describe('Discovery', () => {
await discovery.stop();
await discovery.destroy();
});
test('identity discovery persistence across restarts', async () => {
const discovery = await Discovery.createDiscovery({
db,
keyRing,
gestaltGraph,
identitiesManager,
nodeManager,
taskManager,
logger,
});
await taskManager.startProcessing();
const identityId2 = 'other-gestalt2' as IdentityId;
await nodeA.identitiesManager.putToken(testToken.providerId, identityId2, {
accessToken: 'ghi789',
});
testProvider.users[identityId2] = {};
for (let i = 0; i < testProvider.pageSize * 2; i++) {
const identityClaim = {
typ: 'ClaimLinkIdentity',
iss: nodesUtils.encodeNodeId(nodeA.keyRing.getNodeId()),
sub: encodeProviderIdentityId([testProvider.id, identityId2]),
};
await nodeA.sigchain.addClaim(
identityClaim,
undefined,
async (token: Token<ClaimLinkIdentity>) => {
// Publishing in the callback to avoid adding bad claims
const claim = token.toSigned();
const identitySignedClaim = await testProvider.publishClaim(
identityId2,
claim,
);
// Append the ProviderIdentityClaimId to the token
const payload: ClaimLinkIdentity = {
...claim.payload,
providerIdentityClaimId: identitySignedClaim.id,
};
const newToken = Token.fromPayload(payload);
newToken.signWithPrivateKey(nodeA.keyRing.keyPair);
return newToken;
},
);
}

// Spy on getClaimsPage
let i = 0;
const firstPageCompletedP = utils.promise();
const getClaimsPageMock = jest.spyOn(testProvider, 'getClaimsPage');
getClaimsPageMock.mockImplementation(async function* (
...args: Parameters<typeof testProvider.getClaimsPage>
) {
const result: ReturnType<typeof testProvider.getClaimsPage> =
TestProvider.prototype.getClaimsPage.call(testProvider, ...args);
for await (const claim of result) {
if (args[1] === identityId2) {
if (i === testProvider.pageSize) {
// Trigger manual task stopping
firstPageCompletedP.resolveP();
}
i++;
}
yield claim;
}
});
await discovery.queueDiscoveryByIdentity(testToken.providerId, identityId2);

await firstPageCompletedP.p;
await taskManager.stopProcessing();
await discovery.stop();
await discovery.start();
await taskManager.startProcessing();

let existingTasks: number = 0;
do {
existingTasks = await discovery.waitForDiscoveryTasks();
} while (existingTasks > 0);
const gestalts = await AsyncIterable.as(
gestaltGraph.getGestalts(),
).toArray();
console.log(gestalts);
console.log(i);
delete testProvider.users[identityId2];
getClaimsPageMock.mockReset();
await taskManager.stopProcessing();
await discovery.stop();
await discovery.destroy();
});
test('processed vertices are queued for rediscovery', async () => {
const discovery = await Discovery.createDiscovery({
db,
Expand Down
3 changes: 1 addition & 2 deletions tests/identities/TestProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import * as tokenUtils from '@/tokens/utils';

class TestProvider extends Provider {
public readonly id: ProviderId;
public readonly pageSize = 10;

public linkIdCounter: number = 0;
public users: Record<IdentityId, POJO>;
public links: Record<ProviderIdentityClaimId, string>;
protected userLinks: Record<IdentityId, Array<ProviderIdentityClaimId>>;
protected userTokens: Record<string, IdentityId>;

protected readonly pageSize = 10;

public constructor(providerId: ProviderId = 'test-provider' as ProviderId) {
super();
this.id = providerId;
Expand Down

0 comments on commit 1b53a32

Please sign in to comment.