Skip to content

Commit

Permalink
repo-api: simplify DataTape.listData
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Nov 23, 2023
1 parent 665cbce commit 2a91f79
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
unittest:
strategy:
matrix:
node: [18, 20, 21]
node: [18, 20, '21.1']
fail-fast: false
runs-on: ubuntu-22.04
steps:
Expand Down
3 changes: 2 additions & 1 deletion packages/node-transport/test-fixture/buffer-breaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ export class BufferBreaker extends Transform {

constructor() {
super({ readableObjectMode: true, writableObjectMode: false });
this.timer = setInterval(this.flushBuf, 100);
this.timer = setInterval(this.flushBuf, 50);
}

public override _transform(chunk: Buffer, enc: unknown, callback: (err?: Error) => void) {
void enc;
const buf = this.buf ? Buffer.concat([this.buf, chunk]) : chunk;
const count = Math.min(buf.length, Math.ceil(Math.random() * 1.5 * buf.length));
this.push(buf.subarray(0, count));
Expand Down
3 changes: 2 additions & 1 deletion packages/repo-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"p-event": "^6.0.0",
"streaming-iterables": "^8.0.1",
"throat": "^6.0.2",
"tslib": "^2.6.2"
"tslib": "^2.6.2",
"typescript-event-target": "^1.1.0"
},
"devDependencies": {
"@ndn/node-transport": "workspace:*",
Expand Down
25 changes: 22 additions & 3 deletions packages/repo-api/src/bulk-insert-initiator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { L3Face } from "@ndn/l3face";
import type { Data } from "@ndn/packet";
import { CustomEvent } from "@ndn/util";
import { pushable } from "it-pushable";
import pDefer, { type DeferredPromise } from "p-defer";
import { consume, map } from "streaming-iterables";
import { TypedEventTarget } from "typescript-event-target";

import * as S from "./data-store";

Expand All @@ -11,17 +13,34 @@ interface InsertJob {
defer: DeferredPromise<undefined>;
}

type EventMap = {
error: CustomEvent<Error>;
};

/** Send packets to a bulk insertion target. */
export class BulkInsertInitiator implements S.Close, S.Insert {
export class BulkInsertInitiator extends TypedEventTarget<EventMap> implements S.Close, S.Insert {
private readonly jobs = pushable<InsertJob>({ objectMode: true });
private readonly faceTx: Promise<void>;

/**
* Constructor.
* @param face bulk insertion target.
* RX side is ignored.
* Data packets are sent to its TX side, errors raise 'error' event.
*/
constructor(face: L3Face) {
super();
consume(face.rx).catch(() => undefined);
this.faceTx = face.tx(this.tx()).catch(() => undefined);
this.faceTx = face.tx(this.tx()).catch((err) => {
this.dispatchTypedEvent("error", new CustomEvent("error", { detail: err }));
});
}

public async close() {
/**
* Finish insertion and close the target.
* .insert() cannot be called after this.
*/
public async close(): Promise<void> {
this.jobs.end();
await this.faceTx;
}
Expand Down
44 changes: 22 additions & 22 deletions packages/repo-api/src/data-tape.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ interface WriteItem {
* DataTape is a file or stream that consists of a sequence of Data packets.
* This type implements DataStore interfaces on top of such a file or stream.
*/
export class DataTape implements S.Close, S.ListNames, S.ListData, S.Get, S.Find, S.Insert {
export class DataTape implements DataTape.Reader, DataTape.Writer {
/**
* Constructor.
* @param stream a readable/writable stream, a function to (re)open the stream, or a filename.
*
* If stream is a stream (instead of a function or a filename), only one method may be called once.
* Otherwise, methods can be called, but they must be sequenced because this type is non-thread-safe.
*
* DataTape.Reader methods are available only if the stream is readable.
* DataTape.Writer methods are available only if the stream is writable.
*
* If stream is a stream instance, it allows either one read or multiple writes.
* If stream is an opener function or filename, it allows multiple reads and writes.
* Function calls must be sequenced because this type is non-thread-safe.
*/
constructor(stream: NodeJS.ReadableStream | NodeJS.WritableStream | DataTape.OpenStream | string) {
switch (typeof stream) {
Expand Down Expand Up @@ -70,7 +71,7 @@ export class DataTape implements S.Close, S.ListNames, S.ListData, S.Get, S.Find
this.currentWriter = undefined;
}

private async useReader<R>(cb: (reader: AsyncIterable<Data>) => Promise<R>): Promise<R> {
private async useReader<R>(cb: (reader: AsyncIterable<Data>) => R | Promise<R>): Promise<R> {
let result: any;
await this.mutex(async () => {
await this.closeCurrentWriter();
Expand Down Expand Up @@ -148,24 +149,14 @@ export class DataTape implements S.Close, S.ListNames, S.ListData, S.Get, S.Find
return map((data) => data.name, this.listData(prefix));
}

public listData(prefix?: Name): AsyncIterable<Data> {
const output = pushable<Data>({ objectMode: true });
void (async () => {
try {
await this.useReader(async (reader) => {
for await (const data of reader) {
if (!prefix || prefix.isPrefixOf(data.name)) {
output.push(data);
}
}
});
} catch (err: unknown) {
output.end(err as Error);
return;
public async *listData(prefix?: Name): AsyncIterable<Data> {
yield* await this.useReader(async function*(reader): AsyncGenerator<Data> {
for await (const data of reader) {
if (!prefix || prefix.isPrefixOf(data.name)) {
yield data;
}
}
output.end();
})();
return output;
});
}

private async findFirst(predicate: (data: Data) => Promisable<boolean>): Promise<Data | undefined> {
Expand Down Expand Up @@ -198,6 +189,15 @@ export class DataTape implements S.Close, S.ListNames, S.ListData, S.Get, S.Find
}

export namespace DataTape {
/** Desired mode of opening a stream. */
export type StreamMode = "read" | "append";

/** Function to open a stream for use by DataTape. */
export type OpenStream = (mode: StreamMode) => NodeJS.ReadableStream | NodeJS.WritableStream;

/** Interface of DataTape read operations. */
export type Reader = S.Close & S.ListNames & S.ListData & S.Get & S.Find;

/** Interface of DataTape write operations. */
export type Writer = S.Close & S.Insert;
}
40 changes: 19 additions & 21 deletions packages/repo-api/tests/data-tape.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,19 @@ import { Data, Interest, Name } from "@ndn/packet";
import { Encoder } from "@ndn/tlv";
import { delay } from "@ndn/util";
import { BufferReadableMock, BufferWritableMock } from "stream-mock";
import { collect, map, pipeline, writeToStream } from "streaming-iterables";
import { collect } from "streaming-iterables";
import { dirSync as tmpDir } from "tmp";
import { beforeEach, describe, expect, test, vi } from "vitest";

import { BulkInsertInitiator, BulkInsertTarget, copy, DataTape } from "..";

function makeDataTapeReadStream(mode: DataTape.StreamMode): NodeJS.ReadableStream {
expect(mode).toBe("read");
const bb = new BufferBreaker();
void (async () => {
try {
await pipeline(
async function*() {
for (let i = 0; i < 500; ++i) {
yield new Data(`/A/${Math.trunc(i / 100)}/${i % 100}`);
if (i % 20 === 0) {
await delay(Math.random() * 5);
}
}
},
map((data) => Encoder.encode(data)),
writeToStream(bb),
);
} catch {} finally { bb.end(); }
})();
return bb;
return new BufferReadableMock((function*() {
for (let i = 0; i < 500; ++i) {
yield Encoder.encode(new Data(`/A/${Math.trunc(i / 100)}/${i % 100}`));
}
})()).pipe(new BufferBreaker());
}

function makeDataTapeAppendStream(): [open: DataTape.OpenStream, retrieve: () => Buffer] {
Expand All @@ -50,7 +37,10 @@ function makeDataTapeAppendStream(): [open: DataTape.OpenStream, retrieve: () =>

describe("DataTape reader", () => {
let tape: DataTape;
beforeEach(() => {tape = new DataTape(makeDataTapeReadStream);});
beforeEach(async () => {
tape = new DataTape(makeDataTapeReadStream);
await delay(200);
});

test("listNames", async () => {
const names = await collect(tape.listNames());
Expand All @@ -77,7 +67,7 @@ describe("DataTape reader", () => {

describe("DataTape file", () => {
let dir: string;
beforeEach(async () => {
beforeEach(() => {
const d = tmpDir({ unsafeCleanup: true });
dir = d.name;
return d.removeCallback;
Expand Down Expand Up @@ -106,10 +96,15 @@ async function testBulkInsertTarget(
parallel: 8,
});
await bi.accept(makeDataTapeReadStream("read"));
let total = 0;
for (const [i, call] of storeInsert.mock.calls.entries()) {
console.log(`storeInsert ${i} ${call.length} ${total += call.length}`);
}
expect(storeInsert).toHaveBeenCalledTimes(16);

await tape.close();
const readback = new DataTape(new BufferReadableMock(retrieve()));
// XXX 2023-11-23 this is failing on Node 21.2 but passing on Node 21.1
await expect(collect(readback.listData())).resolves.toHaveLength(500);
}

Expand All @@ -126,6 +121,9 @@ test("BulkInsertTarget make-stream", () => {
test("BulkInsertInitiator", async () => {
const transport = new MockTransport();
const bi = new BulkInsertInitiator(new L3Face(transport));
bi.addEventListener("error", (evt) => {
expect(evt.detail).toBeUndefined();
});
let n = 0;
for (let i = 0; i < 10; ++i) {
await delay(Math.random() * 20);
Expand Down

0 comments on commit 2a91f79

Please sign in to comment.