Skip to content

Commit

Permalink
refactor: the stream logic of reading the json data. (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
easy1090 authored Jul 15, 2024
1 parent 586982c commit 55441e4
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 52 deletions.
6 changes: 6 additions & 0 deletions .changeset/long-seahorses-leave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@rsdoctor/utils': patch
'@rsdoctor/sdk': patch
---

fix(sdk): the error of Buffer string limit
55 changes: 33 additions & 22 deletions packages/sdk/src/sdk/sdk/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,38 @@ export abstract class SDKCore<T extends RsdoctorSDKOptions>
this.diskManifestPath = manifest;
await File.fse.ensureDir(outputDir);

/** write sharding files and get disk result */
const dataUrls: DataWithUrl[] = await Promise.all(
Object.keys(storeData).map(async (key) => {
const data = storeData[key];
// not use filesharding when the data is not object.
if (typeof data !== 'object') {
return {
name: key,
files: data,
};
const urlsPromiseList: (Promise<DataWithUrl> | DataWithUrl)[] = [];

for (let key of Object.keys(storeData)) {
const data = storeData[key];
// not use filesharding when the data is not object.
if (typeof data !== 'object') {
urlsPromiseList.push({
name: key,
files: data,
});
}
const jsonstr: string | string[] = await (async () => {
try {
return JSON.stringify(data);
} catch (error) {
// use the stream json stringify when call JSON.stringify failed due to the json is too large.
return Json.stringify(data);
}
const jsonstr: string = await (async () => {
try {
return JSON.stringify(data);
} catch (error) {
// use the stream json stringify when call JSON.stringify failed due to the json is too large.
return Json.stringify(data);
}
})();
return this.writeToFolder(jsonstr, outputDir, key);
}),
);
})();

if (Array.isArray(jsonstr)) {
const urls = jsonstr.map((str, index) => {
return this.writeToFolder(str, outputDir, key, index + 1);
});
urlsPromiseList.push(...urls);
} else {
urlsPromiseList.push(this.writeToFolder(jsonstr, outputDir, key));
}
}

/** write sharding files and get disk result */
const dataUrls: DataWithUrl[] = await Promise.all(urlsPromiseList);

debug(
() =>
Expand Down Expand Up @@ -194,10 +204,11 @@ export abstract class SDKCore<T extends RsdoctorSDKOptions>
jsonstr: string,
dir: string,
key: string,
index?: number,
): Promise<DataWithUrl> {
const sharding = new File.FileSharding(Algorithm.compressText(jsonstr));
const folder = path.resolve(dir, key);
const writer = sharding.writeStringToFolder(folder);
const writer = sharding.writeStringToFolder(folder, '', index);
return writer.then((item) => {
const res: DataWithUrl = {
name: key,
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/sdk/utils/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export const transformDataUrls = (
): Record<string, string[] | string> => {
return d.reduce((t: { [key: string]: string[] | string }, item) => {
t[item.name] = Array.isArray(item.files)
? item.files.map((e) => e.path)
? item.files.map((e) => e.path).concat(t[item.name] || [])
: item.files;
return t;
}, {});
Expand Down
8 changes: 4 additions & 4 deletions packages/utils/src/build/file/sharding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class FileSharding {
/**
* @param ext the extension name of the output file (must starts with ".")
*/
public createVirtualShardingFiles(ext = '') {
public createVirtualShardingFiles(ext = '', index = 0) {
const bf = Buffer.from(this.content, this.encoding);
const res: Buffer[] = [];
const threshold = this.limitBytes;
Expand All @@ -23,17 +23,17 @@ export class FileSharding {
tmpBytes += threshold;
}

return res.map((e, i) => ({ filename: `${i}${ext}`, content: e }));
return res.map((e, i) => ({ filename: `${i + index}${ext}`, content: e }));
}

/**
* @param folder absolute path of folder which used to save string sharding files.
* @param ext the extension name of the output file (must starts with ".")
*/
public async writeStringToFolder(folder: string, ext = '') {
public async writeStringToFolder(folder: string, ext = '', index?: number) {
const dist = path.resolve(folder);
await fse.ensureDir(dist);
const res = this.createVirtualShardingFiles(ext);
const res = this.createVirtualShardingFiles(ext, index);

await Promise.all(
res.map(
Expand Down
65 changes: 50 additions & 15 deletions packages/utils/src/build/json.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,73 @@
import { JsonStreamStringify } from 'json-stream-stringify';
import { PassThrough } from 'stream';
import { SDK } from '@rsdoctor/types';
import { dirname, join } from 'path';
import { Package } from 'src/common';
import { Transform } from 'stream';

const maxFileSize = 1024 * 1024 * 400; // maximum length of each file, measured in bytes, with 400MB as an example.

export function stringify<T, P = T extends undefined ? undefined : string>(
json: T,
replacer?: (this: any, key: string, value: any) => any,
space?: string | number,
cycle?: boolean,
): Promise<P> {
const jsonList: string[] = [];
if (json && typeof json === 'object') {
return new Promise((resolve, reject) => {
let res = '';
const pt = new PassThrough();
const stream = new JsonStreamStringify(json, replacer, space, cycle);

pt.on('data', (chunk) => {
res += chunk;
});
let currentLength = 0;
let currentContent = '';

pt.on('end', () => {
return resolve(res as P);
});
const batchProcessor = new Transform({
readableObjectMode: true,
transform(chunk, _encoding, callback) {
const lines = chunk.toString().split('\\n');

pt.on('error', (err) => {
return reject(err);
});
lines.forEach((line: string | any[]) => {
if (currentLength + line.length > maxFileSize) {
// 超出最大长度,保存当前内容
jsonList.push(currentContent);
currentContent = '';
currentLength = 0;
}

if (line.length) {
currentContent += line;
currentLength += line.length;
}
});

stream.on('error', (err) => {
return reject(err);
callback();
},
});

stream.pipe(pt);
stream
// .pipe(split2(/\\n/))
.pipe(batchProcessor)
.on('data', (line: string | any[]) => {
if (currentLength + line.length > maxFileSize) {
//Exceeding the maximum length, closing the current file stream.
jsonList.push(currentContent);
currentContent = '';
currentLength = 0;
}

if (line.length) {
currentContent += line;
currentLength += line.length;
}
})
.on('end', () => {
if (jsonList.length < 1) {
jsonList.push(currentContent);
}
resolve(jsonList as P);
})
.on('error', (err: any) => {
return reject(err);
});
});
}

Expand Down
37 changes: 37 additions & 0 deletions packages/utils/tests/__snapshots__/json.test.ts.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html

exports[`test src/json.ts > stringify() > Array & Object 1`] = `
[
"["abcde"]",
]
`;

exports[`test src/json.ts > stringify() > Array & Object 2`] = `
[
"["abcde",123,null,null,true,false]",
]
`;

exports[`test src/json.ts > stringify() > Array & Object 3`] = `
[
"[{"a":1,"c":null},1,[2,{"k":1}]]",
]
`;

exports[`test src/json.ts > stringify() > Array & Object 4`] = `
[
"{"a":1,"c":null}",
]
`;

exports[`test src/json.ts > stringify() > Array & Object 5`] = `
[
"{"a":1,"c":null,"d":{"e":23}}",
]
`;

exports[`test src/json.ts > stringify() > Array & Object 6`] = `
[
"{"d":{"e":23,"f":null,"h":{"a":1}}}",
]
`;
16 changes: 8 additions & 8 deletions packages/utils/tests/json.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,29 @@ describe('test src/json.ts', () => {
});

it('Array & Object', async () => {
expect(await Json.stringify(['abcde'])).toEqual('["abcde"]');
expect(await Json.stringify(['abcde'])).toMatchSnapshot();
expect(
await Json.stringify(['abcde', 123, null, undefined, true, false]),
).toEqual('["abcde",123,null,null,true,false]');
).toMatchSnapshot();
expect(
await Json.stringify([
{ a: 1, b: undefined, c: null },
1,
[2, { k: 1 }],
]),
).toEqual('[{"a":1,"c":null},1,[2,{"k":1}]]');
).toMatchSnapshot();

expect(await Json.stringify({ a: 1, b: undefined, c: null })).toEqual(
'{"a":1,"c":null}',
);
expect(
await Json.stringify({ a: 1, b: undefined, c: null }),
).toMatchSnapshot();
expect(
await Json.stringify({ a: 1, b: undefined, c: null, d: { e: 23 } }),
).toEqual('{"a":1,"c":null,"d":{"e":23}}');
).toMatchSnapshot();
expect(
await Json.stringify({
d: { e: 23, f: null, g: undefined, h: { a: 1 } },
}),
).toEqual('{"d":{"e":23,"f":null,"h":{"a":1}}}');
).toMatchSnapshot();
});
});
});
3 changes: 1 addition & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 55441e4

Please sign in to comment.