From 1b53a32226564494dec09aa792528c8d6927ca7e Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Fri, 17 May 2024 15:18:12 +1000 Subject: [PATCH] wip: task reschedule resumtion --- src/discovery/Discovery.ts | 15 +++--- tests/discovery/Discovery.test.ts | 89 ++++++++++++++++++++++++++++++- tests/identities/TestProvider.ts | 3 +- 3 files changed, 97 insertions(+), 10 deletions(-) diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index f28bb80506..24929ba307 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -167,10 +167,10 @@ class Discovery { }), ); } catch (e) { - if ( - e instanceof tasksErrors.ErrorTaskStop || - e === discoveryStoppingTaskReason - ) { + // We do not check for ErrorTaskStop because the + // below code relies on TaskManager being running + // to reschedule the vertex discovery task. + if (e === discoveryStoppingTaskReason) { // We need to recreate the task for the vertex const vertexId = gestaltsUtils.decodeGestaltId(vertex); if (vertexId == null) never(); @@ -641,8 +641,9 @@ class Discovery { lastProviderPaginationToken, ctx, ); - lastProviderPaginationToken = lastProviderPaginationToken_; const isAborted = ctx.signal.aborted; + lastProviderPaginationToken = lastProviderPaginationToken_; + const gestaltNodeIds: Array = []; // Link the identity with each node from its claims on the provider // Iterate over each of the claims, even if ctx has aborted for (const [claimId, claim] of Object.entries(identityClaims)) { @@ -668,8 +669,8 @@ class Discovery { }, }, ); - // Add this vertex to the queue if it is not present - const gestaltNodeId: GestaltId = ['node', linkedVertexNodeId]; + } + for (const gestaltNodeId of gestaltNodeIds) { if ( !(await this.processedTimeGreaterThan( gestaltNodeId, diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index 028136e634..1dcb9d34d7 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -451,7 +451,7 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); }); - test('discovery persistence across restarts', async () => { + test('node discovery persistence across restarts', async () => { const discovery = await Discovery.createDiscovery({ db, keyRing, @@ -491,6 +491,93 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); }); + test('identity discovery persistence across restarts', async () => { + const discovery = await Discovery.createDiscovery({ + db, + keyRing, + gestaltGraph, + identitiesManager, + nodeManager, + taskManager, + logger, + }); + await taskManager.startProcessing(); + const identityId2 = 'other-gestalt2' as IdentityId; + await nodeA.identitiesManager.putToken(testToken.providerId, identityId2, { + accessToken: 'ghi789', + }); + testProvider.users[identityId2] = {}; + for (let i = 0; i < testProvider.pageSize * 2; i++) { + const identityClaim = { + typ: 'ClaimLinkIdentity', + iss: nodesUtils.encodeNodeId(nodeA.keyRing.getNodeId()), + sub: encodeProviderIdentityId([testProvider.id, identityId2]), + }; + await nodeA.sigchain.addClaim( + identityClaim, + undefined, + async (token: Token) => { + // Publishing in the callback to avoid adding bad claims + const claim = token.toSigned(); + const identitySignedClaim = await testProvider.publishClaim( + identityId2, + claim, + ); + // Append the ProviderIdentityClaimId to the token + const payload: ClaimLinkIdentity = { + ...claim.payload, + providerIdentityClaimId: identitySignedClaim.id, + }; + const newToken = Token.fromPayload(payload); + newToken.signWithPrivateKey(nodeA.keyRing.keyPair); + return newToken; + }, + ); + } + + // Spy on getClaimsPage + let i = 0; + const firstPageCompletedP = utils.promise(); + const getClaimsPageMock = jest.spyOn(testProvider, 'getClaimsPage'); + getClaimsPageMock.mockImplementation(async function* ( + ...args: Parameters + ) { + const result: ReturnType = + TestProvider.prototype.getClaimsPage.call(testProvider, ...args); + for await (const claim of result) { + if (args[1] === identityId2) { + if (i === testProvider.pageSize) { + // Trigger manual task stopping + firstPageCompletedP.resolveP(); + } + i++; + } + yield claim; + } + }); + await discovery.queueDiscoveryByIdentity(testToken.providerId, identityId2); + + await firstPageCompletedP.p; + await taskManager.stopProcessing(); + await discovery.stop(); + await discovery.start(); + await taskManager.startProcessing(); + + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); + const gestalts = await AsyncIterable.as( + gestaltGraph.getGestalts(), + ).toArray(); + console.log(gestalts); + console.log(i); + delete testProvider.users[identityId2]; + getClaimsPageMock.mockReset(); + await taskManager.stopProcessing(); + await discovery.stop(); + await discovery.destroy(); + }); test('processed vertices are queued for rediscovery', async () => { const discovery = await Discovery.createDiscovery({ db, diff --git a/tests/identities/TestProvider.ts b/tests/identities/TestProvider.ts index e1d71684ad..b451c63426 100644 --- a/tests/identities/TestProvider.ts +++ b/tests/identities/TestProvider.ts @@ -20,6 +20,7 @@ import * as tokenUtils from '@/tokens/utils'; class TestProvider extends Provider { public readonly id: ProviderId; + public readonly pageSize = 10; public linkIdCounter: number = 0; public users: Record; @@ -27,8 +28,6 @@ class TestProvider extends Provider { protected userLinks: Record>; protected userTokens: Record; - protected readonly pageSize = 10; - public constructor(providerId: ProviderId = 'test-provider' as ProviderId) { super(); this.id = providerId;