Skip to content

Commit

Permalink
chore: using array of filenames instead of generator of treenodes for…
Browse files Browse the repository at this point in the history
… serialization

[ci skip]
  • Loading branch information
aryanjassal committed Sep 3, 2024
1 parent 8f886cf commit fc78adc
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 93 deletions.
55 changes: 22 additions & 33 deletions src/client/handlers/VaultsSecretsGet.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { DB } from '@matrixai/db';
import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc';
import type VaultManager from '../../vaults/VaultManager';
import { ReadableStream } from 'stream/web';
import { RawHandler } from '@matrixai/rpc';
import { validateSync } from '../../validation';
import { matchSync } from '../../utils';
Expand All @@ -16,8 +17,6 @@ class VaultsSecretsGet extends RawHandler<{
}> {
public handle = async (
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
_cancel: any,
_ctx: any,
): Promise<[JSONObject, ReadableStream<Uint8Array>]> => {
const { vaultManager, db } = this.container;
const [headerMessage, inputStream] = input;
Expand All @@ -27,24 +26,26 @@ class VaultsSecretsGet extends RawHandler<{
if (params == undefined)
throw new validationErrors.ErrorParse('Input params cannot be undefined');

const { nameOrId, secretName }: { nameOrId: string; secretName: string } =
validateSync(
(keyPath, value) => {
return matchSync(keyPath)(
[
['nameOrId', 'secretName'],
() => {
return value as string;
},
],
() => value,
);
},
{
nameOrId: params.vaultNameOrId,
secretName: params.secretName,
},
);
const {
nameOrId,
secretNames,
}: { nameOrId: string; secretNames: Array<string> } = validateSync(
(keyPath, value) => {
return matchSync(keyPath)(
[
['nameOrId'],
() => value as string,
['secretNames'],
() => value as Array<string>,
],
() => value,
);
},
{
nameOrId: params.nameOrId,
secretNames: params.secretNames,
},
);
const secretContentsGen = db.withTransactionG(
async function* (tran): AsyncGenerator<Uint8Array, void, void> {
const vaultIdFromName = await vaultManager.getVaultId(nameOrId, tran);
Expand All @@ -60,19 +61,7 @@ class VaultsSecretsGet extends RawHandler<{
void,
void
> {
const contents = fileTree.serializerStreamFactory(
fs,
fileTree.globWalk({
fs: fs,
basePath: '.',
pattern: secretName,
yieldRoot: false,
yieldStats: false,
yieldFiles: true,
yieldParents: false,
yieldDirectories: false,
}),
);
const contents = fileTree.serializerStreamFactory(fs, secretNames);
for await (const chunk of contents) {
yield chunk;
}
Expand Down
12 changes: 3 additions & 9 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import process from 'process';
import path from 'path';
import nodesEvents from 'events';
import lexi from 'lexicographic-integer';
import { ReadableStream } from 'stream/web'
import { PromiseCancellable } from '@matrixai/async-cancellable';
import { timedCancellable } from '@matrixai/contexts/dist/functions';
import * as utilsErrors from './errors';
Expand Down Expand Up @@ -544,15 +545,8 @@ function setMaxListeners(
async function* streamToAsyncGenerator<T>(
stream: ReadableStream<T>,
): AsyncGenerator<T, void, unknown> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value !== undefined) yield value!;
}
} finally {
reader.releaseLock();
for await (const chunk of stream) {
yield chunk;
}
}

Expand Down
68 changes: 24 additions & 44 deletions src/vaults/fileTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ function generateGenericHeader(headerData: HeaderGeneric): Uint8Array {
* Creates the content header which identifies the content with the length.
* Data should follow this header.
* Formatted as...
* generic_header(10) | total_size(8)[data_size + header_size] | i_node(4) | 'D'(1)
* generic_header(10) | total_size(8)[data_size + header_size] | 'D'(1)
*/
function generateContentHeader(headerData: HeaderContent): Uint8Array {
const contentHeader = new Uint8Array(HeaderSize.CONTENT);
Expand All @@ -215,8 +215,7 @@ function generateContentHeader(headerData: HeaderContent): Uint8Array {
contentHeader.byteLength,
);
dataView.setBigUint64(0, headerData.dataSize, false);
dataView.setUint32(8, headerData.iNode, false);
dataView.setUint8(12, HeaderMagic.END);
dataView.setUint8(8, HeaderMagic.END);
return contentHeader;
}

Expand Down Expand Up @@ -249,8 +248,7 @@ function parseContentHeader(data: Uint8Array): Parsed<HeaderContent> {
const dataView = new DataView(data.buffer, data.byteOffset, data.byteLength);
if (data.byteLength < HeaderSize.CONTENT) return { remainder: data };
const dataSize = dataView.getBigUint64(0, false);
const iNode = dataView.getUint32(8, false);
const magicByte = dataView.getUint8(12);
const magicByte = dataView.getUint8(8);
if (magicByte !== HeaderMagic.END) {
throw new validationErrors.ErrorParse(
`invalid magic byte, should be "${HeaderMagic.END}", found "${magicByte}"`,
Expand All @@ -259,7 +257,6 @@ function parseContentHeader(data: Uint8Array): Parsed<HeaderContent> {
return {
data: {
dataSize,
iNode,
},
remainder: data.subarray(HeaderSize.CONTENT),
};
Expand All @@ -276,7 +273,6 @@ function parseContentHeader(data: Uint8Array): Parsed<HeaderContent> {
async function* encodeContent(
fs: FileSystem | FileSystemReadable,
path: string,
iNode: number,
chunkSize: number = 1024 * 4,
): AsyncGenerator<Uint8Array, void, void> {
const fd = await fs.promises.open(path, 'r');
Expand Down Expand Up @@ -317,7 +313,6 @@ async function* encodeContent(
}),
generateContentHeader({
dataSize: BigInt(stats.size),
iNode,
}),
]);
while (true) {
Expand All @@ -332,52 +327,38 @@ async function* encodeContent(
}

/**
* Takes an AsyncGenerator<TreeNode> and serializes it into a `ReadableStream<UInt8Array>`
* Takes an Array<string> of file paths and serializes their contents into a `ReadableStream<UInt8Array>`
* @param fs
* @param treeGen - An AsyncGenerator<TreeNode> that yields the files and directories of a file tree.
* @param filePaths - An array of file paths to be serialized.
*/
function serializerStreamFactory(
fs: FileSystem | FileSystemReadable,
treeGen: AsyncGenerator<TreeNode, void, void>,
filePaths: Array<string>,
): ReadableStream<Uint8Array> {
let contentsGen: AsyncGenerator<Uint8Array, void, void> | undefined;
let fileNode: TreeNode | undefined;
async function getNextContentChunk(): Promise<Uint8Array | undefined> {
while (true) {
if (contentsGen == null) {
// Keep consuming values if the result is not a file
while (true) {
const result = await treeGen.next();
if (result.done) return undefined;
if (result.value.type === 'FILE') {
fileNode = result.value;
break;
}
}
contentsGen = encodeContent(fs, fileNode.path, fileNode.iNode);
}
const contentChunk = await contentsGen.next();
if (!contentChunk.done) return contentChunk.value;
contentsGen = undefined;
}
}
async function cleanup(reason: unknown) {
await treeGen?.throw(reason).catch(() => {});
await contentsGen?.throw(reason).catch(() => {});
}
let contentsGen: AsyncGenerator<Uint8Array> | undefined;
return new ReadableStream<Uint8Array>({
pull: async (controller) => {
try {
const contentChunk = await getNextContentChunk();
if (contentChunk == null) return controller.close();
else controller.enqueue(contentChunk);
while (true) {
if (contentsGen == null) {
const path = filePaths.shift();
if (path == null) return controller.close();
contentsGen = encodeContent(fs, path);
}
const { done, value } = await contentsGen.next();
if (!done) {
controller.enqueue(value);
return;
}
contentsGen = undefined;
}
} catch (e) {
await cleanup(e);
await contentsGen?.throw(e).catch(() => {});
return controller.error(e);
}
},
cancel: async (reason) => {
await cleanup(reason);
await contentsGen?.throw(reason).catch(() => {});
},
});
}
Expand Down Expand Up @@ -473,9 +454,8 @@ function parserTransformStreamFactory(): TransformStream<
const contentHeader = parseContentHeader(genericHeader.remainder);
if (contentHeader.data == null) return;

const { dataSize, iNode } = contentHeader.data;
controller.enqueue({ type: 'CONTENT', dataSize, iNode });
contentLength = dataSize;
contentLength = contentHeader.data.dataSize;
controller.enqueue({ type: 'CONTENT', dataSize: contentLength });
workingBuffer = contentHeader.remainder;
}
// We yield the whole buffer, or split it for the next header
Expand Down
4 changes: 1 addition & 3 deletions src/vaults/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ type TreeNode = {

type ContentNode = {
type: 'CONTENT';
iNode: number;
dataSize: bigint;
};
type DoneMessage = { type: 'DONE' };
Expand Down Expand Up @@ -181,12 +180,11 @@ type HeaderGeneric = {
};
type HeaderContent = {
dataSize: bigint;
iNode: number;
};

enum HeaderSize {
GENERIC = 2,
CONTENT = 13,
CONTENT = 9,
}

enum HeaderType {
Expand Down
34 changes: 34 additions & 0 deletions tests/client/handlers/vaults.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,40 @@ describe('vaultsSecretsNew and vaultsSecretsDelete, vaultsSecretsGet', () => {
vaultsErrors.ErrorSecretsSecretUndefined,
);
});
// TODO: TEST
test('view output', async () => {
const secret = 'test-secret';
const vaultId = await vaultManager.createVault('test-vault');
const vaultIdEncoded = vaultsUtils.encodeVaultId(vaultId);
await rpcClient.methods.vaultsSecretsNew({
nameOrId: vaultIdEncoded,
secretName: secret,
secretContent: Buffer.from('test-secret-contents-1').toString('binary'),
});
await rpcClient.methods.vaultsSecretsNew({
nameOrId: vaultIdEncoded,
secretName: 's2',
secretContent: Buffer.from('test-secret-contents-abc').toString('binary'),
});
const response = await rpcClient.methods.vaultsSecretsGet({
nameOrId: vaultIdEncoded,
secretNames: ['test-secret','s2'],
});
// const secretContent = response.meta?.result;
const data: Array<Uint8Array> = [];
for await (const d of response.readable) data.push(d);
// console.log(new TextDecoder().decode(Buffer.concat(data)));
const output = Buffer.concat(data)
.toString('utf-8')
.split('')
.map(char => {
const code = char.charCodeAt(0);
return code >= 32 && code <= 126 ? char : `\\x${code.toString(16).padStart(2, '0')}`;
})
.join('');
console.log(output);

})
});
describe('vaultsSecretsNewDir and vaultsSecretsList', () => {
const logger = new Logger('vaultsSecretsNewDirList test', LogLevel.WARN, [
Expand Down
7 changes: 3 additions & 4 deletions tests/vaults/fileTree.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,9 @@ describe('fileTree', () => {
yieldParents: false,
yieldDirectories: false,
});
const serializedStream = fileTree.serializerStreamFactory(
fs,
fileTreeGen,
);
const data: Array<string> = [];
for await (const p of fileTreeGen) data.push(p.path);
const serializedStream = fileTree.serializerStreamFactory(fs, data);
const parserTransform = fileTree.parserTransformStreamFactory();
const outputStream = serializedStream.pipeThrough(parserTransform);
const output: Array<ContentNode | Uint8Array> = [];
Expand Down

0 comments on commit fc78adc

Please sign in to comment.