From a4f1a7e3ed467338c5855cf8f8a3d14e042d9fb2 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Fri, 18 Oct 2024 16:43:42 +1100 Subject: [PATCH] wip: applying cancellability to handlers --- src/git/http.ts | 18 +++++++++++------- src/git/utils.ts | 10 +++++++--- src/nodes/agent/handlers/VaultsGitPackGet.ts | 2 ++ src/nodes/agent/handlers/VaultsScan.ts | 4 ++++ src/vaults/VaultManager.ts | 14 +++++++++----- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/git/http.ts b/src/git/http.ts index bd81dac36..87d45cf6e 100644 --- a/src/git/http.ts +++ b/src/git/http.ts @@ -10,6 +10,7 @@ import { Buffer } from 'buffer'; import git from 'isomorphic-git'; import * as gitUtils from './utils'; import * as utils from '../utils'; +import {ContextCancellable} from "@matrixai/contexts"; /** * Reference discovery @@ -118,7 +119,7 @@ async function* advertiseRefGenerator({ efs: EncryptedFS; dir: string; gitDir: string; -}): AsyncGenerator { +}, ctx: ContextCancellable): AsyncGenerator { // Providing side-band-64, symref for the HEAD and agent name capabilities const capabilityList = [ gitUtils.SIDE_BAND_64_CAPABILITY, @@ -134,14 +135,14 @@ async function* advertiseRefGenerator({ efs, dir, gitDir, - }); + }, ctx ); // PKT-LINE("# service=$servicename" LF) yield packetLineBuffer(gitUtils.REFERENCE_DISCOVERY_HEADER); // "0000" yield gitUtils.FLUSH_PACKET_BUFFER; // Ref_list - yield* referenceListGenerator(objectGenerator, capabilityList); + yield* referenceListGenerator(objectGenerator, capabilityList, ctx); // "0000" yield gitUtils.FLUSH_PACKET_BUFFER; } @@ -165,6 +166,7 @@ async function* advertiseRefGenerator({ async function* referenceListGenerator( objectGenerator: AsyncGenerator<[Reference, ObjectId], void, void>, capabilities: CapabilityList, + ctx: ContextCancellable, ): AsyncGenerator { // Cap-list = capability *(SP capability) const capabilitiesListBuffer = Buffer.from( @@ -175,6 +177,7 @@ async function* referenceListGenerator( // *ref_record let first = true; for await (const [name, objectId] of objectGenerator) { + ctx.signal.throwIfAborted(); if (first) { // PKT-LINE(obj-id SP name NUL cap_list LF) yield packetLineBuffer( @@ -351,7 +354,7 @@ async function* generatePackRequest({ dir: string; gitDir: string; body: Array; -}): AsyncGenerator { +}, ctx: ContextCancellable): AsyncGenerator { const [wants, haves, _capabilities] = await parsePackRequest(body); const objectIds = await gitUtils.listObjects({ efs: efs, @@ -359,7 +362,7 @@ async function* generatePackRequest({ gitDir: gitDir, wants, haves, - }); + }, ctx); // Reply that we have no common history and that we need to send everything yield packetLineBuffer(gitUtils.NAK_BUFFER); // Send everything over in pack format @@ -368,7 +371,7 @@ async function* generatePackRequest({ dir, gitDir, objectIds, - }); + }, ctx); // Send dummy progress data yield packetLineBuffer( gitUtils.DUMMY_PROGRESS_BUFFER, @@ -396,7 +399,7 @@ async function* generatePackData({ gitDir: string; objectIds: Array; chunkSize?: number; -}): AsyncGenerator { +}, ctx: ContextCancellable): AsyncGenerator { let packFile: PackObjectsResult; // In case of errors we don't want to throw them. This will result in the error being thrown into `isometric-git` // when it consumes the response. It handles this by logging out the error which we don't want to happen. @@ -423,6 +426,7 @@ async function* generatePackData({ // Streaming the packFile as chunks of the length specified by the `chunkSize`. // Each line is formatted as a `PKT-LINE` do { + ctx.signal.throwIfAborted(); const subBuffer = packFileBuffer.subarray(0, chunkSize); packFileBuffer = packFileBuffer.subarray(chunkSize); yield packetLineBuffer(subBuffer, gitUtils.CHANNEL_DATA); diff --git a/src/git/utils.ts b/src/git/utils.ts index c80b956bc..bd2074887 100644 --- a/src/git/utils.ts +++ b/src/git/utils.ts @@ -13,6 +13,7 @@ import git from 'isomorphic-git'; import { requestTypes } from './types'; import * as utils from '../utils'; import * as validationErrors from '../validation/errors'; +import {ContextCancellable} from "@matrixai/contexts"; // Constants // Total number of bytes per pack line minus the 4 size bytes and 1 channel byte @@ -75,7 +76,7 @@ async function* listReferencesGenerator({ efs: EncryptedFS; dir: string; gitDir: string; -}): AsyncGenerator<[Reference, ObjectId], void, void> { +}, ctx: ContextCancellable): AsyncGenerator<[Reference, ObjectId], void, void> { const refs: Array<[string, Promise]> = await git .listBranches({ fs: efs, @@ -84,6 +85,7 @@ async function* listReferencesGenerator({ }) .then((refs) => { return refs.map((ref) => { + ctx.signal.throwIfAborted(); return [ `${REFERENCES_STRING}${ref}`, git.resolveRef({ fs: efs, dir, gitdir: gitDir, ref: ref }), @@ -99,6 +101,7 @@ async function* listReferencesGenerator({ }); yield [HEAD_REFERENCE, resolvedHead]; for (const [key, refP] of refs) { + ctx.signal.throwIfAborted(); yield [key, await refP]; } } @@ -155,7 +158,7 @@ async function listObjects({ gitDir: string; wants: ObjectIdList; haves: ObjectIdList; -}): Promise { +}, ctx: ContextCancellable): Promise { const commits = new Set(); const trees = new Set(); const blobs = new Set(); @@ -163,6 +166,7 @@ async function listObjects({ const havesSet: Set = new Set(haves); async function walk(objectId: ObjectId, type: ObjectType): Promise { + ctx.signal.throwIfAborted(); // If object was listed as a have then we don't need to walk over it if (havesSet.has(objectId)) return; switch (type) { @@ -243,7 +247,7 @@ async function listObjectsAll({ }: { fs: EncryptedFS; gitDir: string; -}) { +}): Promise> { const objectsDirPath = path.join(gitDir, objectsDirName); const objectSet: Set = new Set(); const objectDirs = await fs.promises.readdir(objectsDirPath); diff --git a/src/nodes/agent/handlers/VaultsGitPackGet.ts b/src/nodes/agent/handlers/VaultsGitPackGet.ts index dcdc6846e..22259244a 100644 --- a/src/nodes/agent/handlers/VaultsGitPackGet.ts +++ b/src/nodes/agent/handlers/VaultsGitPackGet.ts @@ -1,5 +1,6 @@ import type { DB } from '@matrixai/db'; import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc'; +import type {ContextTimed} from '@matrixai/contexts'; import type { VaultName } from '../../../vaults/types'; import type ACL from '../../../acl/ACL'; import type VaultManager from '../../../vaults/VaultManager'; @@ -24,6 +25,7 @@ class VaultsGitPackGet extends RawHandler<{ input: [JSONRPCRequest, ReadableStream], _cancel, meta, + ctx: ContextTimed, ): Promise<[JSONObject, ReadableStream]> => { const { vaultManager, acl, db } = this.container; const [headerMessage, inputStream] = input; diff --git a/src/nodes/agent/handlers/VaultsScan.ts b/src/nodes/agent/handlers/VaultsScan.ts index 9d2922cfe..5a89e9680 100644 --- a/src/nodes/agent/handlers/VaultsScan.ts +++ b/src/nodes/agent/handlers/VaultsScan.ts @@ -1,4 +1,5 @@ import type { DB } from '@matrixai/db'; +import type {ContextTimed} from '@matrixai/contexts'; import type { AgentRPCRequestParams, AgentRPCResponseResult, @@ -25,6 +26,7 @@ class VaultsScan extends ServerHandler< input: AgentRPCRequestParams, _cancel, meta, + ctx: ContextTimed, ): AsyncGenerator> { const { vaultManager, db } = this.container; const requestingNodeId = agentUtils.nodeIdFromMeta(meta); @@ -36,6 +38,7 @@ class VaultsScan extends ServerHandler< > { const listResponse = vaultManager.handleScanVaults( requestingNodeId, + ctx, tran, ); for await (const { @@ -43,6 +46,7 @@ class VaultsScan extends ServerHandler< vaultName, vaultPermissions, } of listResponse) { + ctx.signal.throwIfAborted(); yield { vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId), vaultName, diff --git a/src/vaults/VaultManager.ts b/src/vaults/VaultManager.ts index 30a7183bd..8048f442c 100644 --- a/src/vaults/VaultManager.ts +++ b/src/vaults/VaultManager.ts @@ -40,6 +40,7 @@ import * as nodesUtils from '../nodes/utils'; import * as keysUtils from '../keys/utils'; import config from '../config'; import { mkdirExists } from '../utils/utils'; +import {ContextCancellable} from "@matrixai/contexts"; /** * Object map pattern for each vault @@ -838,12 +839,13 @@ class VaultManager { public async *handlePackRequest( vaultId: VaultId, body: Array, + ctx: ContextCancellable, tran?: DBTransaction, ): AsyncGenerator { if (tran == null) { // Lambda to maintain `this` context const handlePackRequest = (tran: DBTransaction) => - this.handlePackRequest(vaultId, body, tran); + this.handlePackRequest(vaultId, body, ctx, tran); return yield* this.db.withTransactionG(async function* (tran) { return yield* handlePackRequest(tran); }); @@ -853,8 +855,8 @@ class VaultManager { const efs = this.efs; yield* withG( [ - this.vaultLocks.lock([vaultId.toString(), RWLockWriter, 'read']), - vault.getLock().read(), + this.vaultLocks.lock([vaultId.toString(), RWLockWriter, 'read'], ctx), + vault.getLock().read(ctx), ], async function* (): AsyncGenerator { yield* gitHttp.generatePackRequest({ @@ -862,7 +864,7 @@ class VaultManager { dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'), gitDir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'), body: body, - }); + }, ctx); }, ); } @@ -900,6 +902,7 @@ class VaultManager { */ public async *handleScanVaults( nodeId: NodeId, + ctx: ContextCancellable, tran?: DBTransaction, ): AsyncGenerator<{ vaultId: VaultId; @@ -909,7 +912,7 @@ class VaultManager { if (tran == null) { // Lambda to maintain `this` context const handleScanVaults = (tran: DBTransaction) => - this.handleScanVaults(nodeId, tran); + this.handleScanVaults(nodeId, ctx, tran); return yield* this.db.withTransactionG(async function* (tran) { return yield* handleScanVaults(tran); }); @@ -932,6 +935,7 @@ class VaultManager { // Getting the list of vaults const vaults = permissions.vaults; for (const vaultIdString of Object.keys(vaults)) { + ctx.signal.throwIfAborted(); // Getting vault permissions const vaultId = IdInternal.fromString(vaultIdString); const vaultPermissions = Object.keys(