diff --git a/benches/suites/git/gitClone.ts b/benches/suites/git/gitClone.ts index 93ff2ce6ef..70519da073 100644 --- a/benches/suites/git/gitClone.ts +++ b/benches/suites/git/gitClone.ts @@ -1,12 +1,20 @@ +import type { ContextTimed } from '@matrixai/contexts'; +import type { ReadableWritablePair } from 'stream/web'; +import type { JSONObject, JSONRPCRequest, RPCStream } from '@matrixai/rpc'; +import type { POJO } from '@'; import fs from 'fs'; import path from 'path'; import os from 'os'; +import { ReadableStream, TransformStream } from 'stream/web'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import b from 'benny'; import { EncryptedFS } from 'encryptedfs'; import git from 'isomorphic-git'; +import { RawCaller, RawHandler, RPCClient, RPCServer } from '@matrixai/rpc'; +import * as utils from '@/utils'; import { summaryName, suiteCommon } from '../../utils'; import * as gitTestUtils from '../../../tests/git/utils'; +import * as gitHttp from '../../../src/git/http'; import * as keysUtils from '../../../src/keys/utils'; async function main() { @@ -87,7 +95,202 @@ async function main() { ...testGitState, }); + class GitAdvertiseHandler extends RawHandler<{ + fs: EncryptedFS; + dir: string; + gitDir: string; + }> { + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + ): Promise<[JSONObject, ReadableStream]> => { + const { fs, dir, gitDir } = this.container; + const [, inputStream] = input; + await inputStream.cancel(); + + let advertiseRefGenerator: AsyncGenerator; + const stream = new ReadableStream({ + start: async () => { + advertiseRefGenerator = gitHttp.advertiseRefGenerator({ + fs: fs as any, + dir, + gitDir, + }); + }, + pull: async (controller) => { + const result = await advertiseRefGenerator.next(); + if (result.done) { + controller.close(); + return; + } else { + controller.enqueue(result.value); + } + }, + cancel: async (reason) => { + await advertiseRefGenerator.throw(reason).catch(() => {}); + }, + }); + return [{}, stream]; + }; + } + + class GitPackHandler extends RawHandler<{ + fs: EncryptedFS; + dir: string; + gitDir: string; + }> { + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + ): Promise<[JSONObject, ReadableStream]> => { + const { fs, dir, gitDir } = this.container; + const [, inputStream] = input; + + let gitPackgenerator: AsyncGenerator; + const stream = new ReadableStream({ + start: async () => { + const body: Array = []; + for await (const message of inputStream) { + body.push(Buffer.from(message)); + } + gitPackgenerator = gitHttp.generatePackRequest({ + fs: fs as any, + dir, + gitDir, + body, + }); + }, + pull: async (controller) => { + const result = await gitPackgenerator.next(); + if (result.done) { + controller.close(); + return; + } else { + controller.enqueue(result.value); + } + }, + cancel: async (reason) => { + await gitPackgenerator.throw(reason).catch(() => {}); + }, + }); + return [{}, stream]; + }; + } + // Creating RPC + const rpcServer = new RPCServer({ + logger: logger.getChild('RPCServer'), + }); + await rpcServer.start({ + manifest: { + gitAdvertiseFs: new GitAdvertiseHandler(gitDirsFs), + gitAdvertiseEfs: new GitAdvertiseHandler(gitDirsEfs), + gitPackFs: new GitPackHandler(gitDirsFs), + gitPackEfs: new GitPackHandler(gitDirsEfs), + }, + }); + + function createPassthroughStream() { + const forwardPass = new TransformStream({ + transform: (chunk, controller) => { + // Console.log('forward -- ', chunk.toString()); + controller.enqueue(chunk); + }, + }); + const reversePass = new TransformStream({ + transform: (chunk, controller) => { + // Console.log('reverse -- ', chunk.toString()); + controller.enqueue(chunk); + }, + }); + const clientPair: ReadableWritablePair = { + readable: reversePass.readable, + writable: forwardPass.writable, + }; + const serverPair: ReadableWritablePair = { + readable: forwardPass.readable, + writable: reversePass.writable, + }; + return { + clientPair, + serverPair, + }; + } + + const rpcClient = new RPCClient({ + manifest: { + gitAdvertiseFs: new RawCaller(), + gitAdvertiseEfs: new RawCaller(), + gitPackFs: new RawCaller(), + gitPackEfs: new RawCaller(), + }, + async streamFactory( + ctx: ContextTimed, + ): Promise> { + const { clientPair, serverPair } = createPassthroughStream< + Uint8Array, + Uint8Array + >(); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + return { + ...clientPair, + cancel: () => {}, + }; + }, + }); + + function request({ type }: { type: 'fs' | 'efs' }) { + return async ({ + url, + method = 'GET', + headers = {}, + body = [Buffer.from('')], + }: { + url: string; + method: string; + headers: POJO; + body: Array; + }) => { + // Console.log('body', body.map(v => v.toString())) + if (method === 'GET') { + // Send back the GET request info response + const advertiseRefResponse = + type === 'fs' + ? await rpcClient.methods.gitAdvertiseFs({}) + : await rpcClient.methods.gitAdvertiseEfs({}); + // Await advertiseRefResponse.writable.close(); + + return { + url: url, + method: method, + body: advertiseRefResponse.readable, + headers: headers, + statusCode: 200, + statusMessage: 'OK', + }; + } else if (method === 'POST') { + const packResponse = + type === 'fs' + ? await rpcClient.methods.gitPackFs({}) + : await rpcClient.methods.gitPackEfs({}); + const writer = packResponse.writable.getWriter(); + for (const buffer of body) await writer.write(buffer); + await writer.close(); + + return { + url: url, + method: method, + body: packResponse.readable, + headers: headers, + statusCode: 200, + statusMessage: 'OK', + }; + } else { + utils.never(); + } + }; + } const summary = await b.suite( summaryName(__filename), @@ -107,8 +310,21 @@ async function main() { url: 'http://', }); }), - b.add('git clone with rpc', async () => { - // TODO: run test with request over RPC. + b.add('git clone with fs + rpc', async () => { + await git.clone({ + fs: efs, + dir: gitDirsEfs.dir, + http: { request: request({ type: 'fs' }) }, + url: 'http://', + }); + }), + b.add('git clone with efs + rpc', async () => { + await git.clone({ + fs: efs, + dir: gitDirsEfs.dir, + http: { request: request({ type: 'efs' }) }, + url: 'http://', + }); }), ...suiteCommon, ); diff --git a/tests/git/utils.ts b/tests/git/utils.ts index e3b2590a4a..96c2323dc2 100644 --- a/tests/git/utils.ts +++ b/tests/git/utils.ts @@ -150,8 +150,8 @@ function generateTestNegotiationLine(data: NegotiationTestData, rest: Buffer) { // Used to print out the contents of an `Buffer` iterable for testing async function* tapGen( - gen: AsyncIterable, -): AsyncGenerator { + gen: AsyncIterable, +): AsyncGenerator { let acc = ''; for await (const line of gen) { acc += line.toString();