Skip to content

Commit

Permalink
repo-api: don't use L3Face in BulkInsert*
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Nov 26, 2023
1 parent 5324bc5 commit 15207f4
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 118 deletions.
1 change: 0 additions & 1 deletion mk/build-post.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ const CJS_IMPORTS = new Set([
"@yoursunny/asn1",
"applymixins",
"buffer-compare",
"duplexify",
"encoding-down",
"event-iterator",
"fast-chunk-string",
Expand Down
25 changes: 20 additions & 5 deletions packages/l3face/src/rxtx-stream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import type { Socket } from "node:net";

import { Decoder } from "@ndn/tlv";
import { safeIter } from "@ndn/util";
import { pEvent } from "p-event";
import { writeToStream } from "streaming-iterables";

import type { Transport } from "./transport";
// import { pipeline } from "node:stream/promises";

/**
* Parse TLVs from input stream.
* @param conn input stream, such as a socket.
* @returns AsyncIterable of TLVs.
*/
export async function* rxFromStream(conn: NodeJS.ReadableStream): Transport.Rx {
let leftover = Buffer.alloc(0);
for await (const chunk of safeIter(conn as AsyncIterable<Buffer>)) {
Expand All @@ -27,17 +35,24 @@ export async function* rxFromStream(conn: NodeJS.ReadableStream): Transport.Rx {
}
}

/**
* Pipe encoded packets to output stream.
* @param conn output stream, such as a socket.
* @returns a function that accepts AsyncIterable of Uint8Array containing encoded packets.
*/
export function txToStream(conn: NodeJS.WritableStream): Transport.Tx {
return async (iterable: AsyncIterable<Uint8Array>) => {
try {
await writeToStream(conn, iterable);
} finally {
conn.end();
try { await pEvent(conn, "finish", { timeout: 100 }); } catch {}
try {
conn.end();
await pEvent(conn, "finish", { timeout: 100 });
} catch {}

const destroyable = conn as unknown as { destroy?: () => void };
if (typeof destroyable.destroy === "function") {
destroyable.destroy();
const socket = conn as Socket;
if (typeof socket.destroy === "function") {
socket.destroy();
}
}
};
Expand Down
14 changes: 7 additions & 7 deletions packages/l3face/test-fixture/mock-transport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Decoder, type Encodable, Encoder } from "@ndn/tlv";
import { abortableSource, AbortError as IteratorAbortError } from "abortable-iterator";
import { pushable } from "it-pushable";
import { consume, pipeline, tap } from "streaming-iterables";

import { Transport } from "..";

Expand All @@ -15,8 +14,11 @@ export class MockTransport extends Transport {
}

public recv(pkt: Encodable) {
const tlv = new Decoder(Encoder.encode(pkt)).read();
const wire = Encoder.encode(pkt);
const decoder = new Decoder(wire);
const tlv = decoder.read();
this.rx.push(tlv);
decoder.throwUnlessEof();
}

public close(err?: Error) {
Expand All @@ -26,11 +28,9 @@ export class MockTransport extends Transport {

public override readonly tx = async (iterable: AsyncIterable<Uint8Array>) => {
try {
await pipeline(
() => abortableSource(iterable, this.closing.signal),
tap((pkt) => this.send(pkt)),
consume,
);
for await (const pkt of abortableSource(iterable, this.closing.signal)) {
this.send(pkt);
}
} catch (err: unknown) {
if (!(err instanceof IteratorAbortError)) {
throw err;
Expand Down
2 changes: 1 addition & 1 deletion packages/repo-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This package defines the programming interface of an abstract Data repository, a
* `listNames(prefix?)` iterates over names of stored Data.
* `listData(prefix?)` iterates over stored Data.
* `insert(options?, ...pkts)` inserts Data packets.
* `erase(...names)` deletes Data packets.
* `delete(...names)` deletes Data packets.

## DataTape

Expand Down
4 changes: 1 addition & 3 deletions packages/repo-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
"@ndn/naming-convention2": "workspace:*",
"@ndn/packet": "workspace:*",
"@ndn/rdr": "workspace:*",
"@ndn/tlv": "workspace:*",
"@ndn/util": "workspace:*",
"duplexify": "^4.1.2",
"is-stream": "^3.0.0",
"it-pushable": "^3.2.3",
"p-defer": "^4.0.0",
Expand All @@ -43,8 +43,6 @@
},
"devDependencies": {
"@ndn/node-transport": "workspace:*",
"@ndn/tlv": "workspace:*",
"@types/duplexify": "^3.6.4",
"@types/tmp": "^0.2.6",
"stream-mock": "^2.0.5",
"tmp": "^0.2.1",
Expand Down
17 changes: 7 additions & 10 deletions packages/repo-api/src/bulk-insert-initiator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { TypedEventTarget } from "typescript-event-target";

import * as S from "./data-store";

interface InsertJob {
interface Burst {
pkts: AsyncIterable<Data>;
defer: DeferredPromise<undefined>;
}
Expand All @@ -19,7 +19,7 @@ type EventMap = {

/** Send packets to a bulk insertion target. */
export class BulkInsertInitiator extends TypedEventTarget<EventMap> implements S.Close, S.Insert {
private readonly jobs = pushable<InsertJob>({ objectMode: true });
private readonly queue = pushable<Burst>({ objectMode: true });
private readonly faceTx: Promise<void>;

/**
Expand All @@ -41,7 +41,7 @@ export class BulkInsertInitiator extends TypedEventTarget<EventMap> implements S
* .insert() cannot be called after this.
*/
public async close(): Promise<void> {
this.jobs.end();
this.queue.end();
await this.faceTx;
}

Expand All @@ -53,16 +53,13 @@ export class BulkInsertInitiator extends TypedEventTarget<EventMap> implements S
*/
public async insert(...args: S.Insert.Args<{}>): Promise<void> {
const { pkts } = S.Insert.parseArgs<{}>(args);
const job: InsertJob = {
pkts,
defer: pDefer(),
};
this.jobs.push(job);
return job.defer.promise;
const defer = pDefer<undefined>();
this.queue.push({ pkts, defer });
return defer.promise;
}

private async *tx(): AsyncIterable<{ l3: Data }> {
for await (const job of this.jobs) {
for await (const job of this.queue) {
yield* map((data) => ({ l3: data }), job.pkts);
job.defer.resolve();
}
Expand Down
7 changes: 6 additions & 1 deletion packages/repo-api/src/bulk-insert-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ export class BulkInsertTarget {

private constructor(private readonly store: S.Insert<any>, private readonly opts?: any) {}

public accept(stream: NodeJS.ReadableStream): Promise<void> {
/**
* Accept bulk insertion from a stream of Data packets.
* @param stream input stream.
* @returns number of Data packets inserted.
*/
public accept(stream: NodeJS.ReadableStream): Promise<number> {
const src = new DataTape(stream);
return copy(src, this.store, this.opts);
}
Expand Down
16 changes: 11 additions & 5 deletions packages/repo-api/src/copy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ export interface CopyOptions {
* @param prefix name prefix to select Data packets.
* @param dst destination DataStore.
* @param opts insert options and copy batching options.
* @returns number of Data packets copied.
*/
export async function copy<InsertOptions extends {} = never>(src: S.ListData, prefix: Name,
dst: S.Insert<InsertOptions>, opts?: CopyOptions & InsertOptions): Promise<void>;
dst: S.Insert<InsertOptions>, opts?: CopyOptions & InsertOptions): Promise<number>;
export async function copy<InsertOptions extends {} = never>(src: S.ListData,
dst: S.Insert<InsertOptions>, opts?: CopyOptions & InsertOptions): Promise<void>;
dst: S.Insert<InsertOptions>, opts?: CopyOptions & InsertOptions): Promise<number>;

export async function copy(src: S.ListData, arg2: any, arg3?: any, arg4?: any): Promise<void> {
export async function copy(src: S.ListData, arg2: any, arg3?: any, arg4?: any): Promise<number> {
const [prefix, dst, opts = {}]: [Name | undefined, S.Insert<any>, any] =
arg2 instanceof Name ? [arg2, arg3, arg4] : [undefined, arg2, arg3];
const {
Expand All @@ -34,10 +35,15 @@ export async function copy(src: S.ListData, arg2: any, arg3?: any, arg4?: any):
delete insertOpts.batch;
delete insertOpts.parallel;

return pipeline(
let n = 0;
await pipeline(
() => src.listData(prefix),
batch(batchSize),
transform(parallel, (pkts) => dst.insert(insertOpts, ...pkts)),
transform(parallel, async (pkts) => {
await dst.insert(insertOpts, ...pkts);
n += pkts.length;
}),
consume,
);
return n;
}
104 changes: 29 additions & 75 deletions packages/repo-api/src/data-tape.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import { L3Face, StreamTransport } from "@ndn/l3face";
import { rxFromStream } from "@ndn/l3face";
import { Data, type Interest, type Name } from "@ndn/packet";
import duplexify from "duplexify";
import { Encoder } from "@ndn/tlv";
import { assert } from "@ndn/util";
import { isReadableStream, isWritableStream } from "is-stream";
import { type Pushable, pushable } from "it-pushable";
import pDefer, { type DeferredPromise } from "p-defer";
import { pEvent } from "p-event";
import { consume, filter, map, pipeline } from "streaming-iterables";
import { filter, map, pipeline, writeToStream } from "streaming-iterables";
import throat from "throat";
import type { Promisable } from "type-fest";

import * as S from "./data-store";
import { makeOpenFileStreamFunction } from "./data-tape-file_node";

interface WriteItem {
pkts: AsyncIterable<Data>;
done: DeferredPromise<void>;
}

/**
* DataTape is a file or stream that consists of a sequence of Data packets.
* This type implements DataStore interfaces on top of such a file or stream.
Expand Down Expand Up @@ -59,89 +53,49 @@ export class DataTape implements DataTape.Reader, DataTape.Writer {

private readonly makeStream: (mode: DataTape.StreamMode) => NodeJS.ReadableStream | NodeJS.WritableStream;
private readonly mutex = throat(1);
private currentWriter?: [L3Face, Pushable<WriteItem>];
private currentWriter?: NodeJS.WritableStream;

private async closeCurrentWriter() {
if (!this.currentWriter) {
return;
}
const [face, tx] = this.currentWriter;
tx.end();
await pEvent(face, "close");
this.currentWriter.end();
await pEvent(this.currentWriter, "finish");
this.currentWriter = undefined;
}

private async useReader<R>(cb: (reader: AsyncIterable<Data>) => Promisable<R>): Promise<R> {
let result: any;
await this.mutex(async () => {
return this.mutex(async (): Promise<R> => {
await this.closeCurrentWriter();

const stream = this.makeStream("read");
if (!isReadableStream(stream)) {
throw new Error("stream is not Readable");
}

const duplex = duplexify(undefined, stream);
const defer = pDefer<void>();
const close = () => defer.resolve();
duplex.once("end", close);

const face = new L3Face(new StreamTransport(duplex));
// eslint-disable-next-line require-yield
void face.tx((async function*() { await defer.promise; })());

const reader = pipeline(
() => face.rx,
map((pkt) => pkt.l3),
filter((pkt): pkt is Data => pkt instanceof Data),
assert(isReadableStream(stream), "stream is not Readable");

return pipeline(
() => rxFromStream(stream),
map(({ decoder }) => {
try {
return decoder.decode(Data);
} catch {
return undefined;
}
}),
filter((data): data is Data => data instanceof Data),
cb,
);

try {
result = await cb(reader);
} finally {
close();
}
});
return result;
}

private async useWriter(cb: (write: (pkts: AsyncIterable<Data>) => Promise<void>) => Promise<void>) {
await this.mutex(async () => {
if (!this.currentWriter) {
const stream = this.makeStream("append");
if (!isWritableStream(stream)) {
throw new Error("stream is not Writable");
}

const duplex = duplexify(stream, undefined);
const face = new L3Face(new StreamTransport(duplex));
consume(face.rx).catch(() => undefined);

const tx = pushable<WriteItem>({ objectMode: true });
face.tx((async function*() {
for await (const item of tx) {
try {
yield* map((l3) => ({ l3 }), item.pkts);
} catch (err: unknown) {
item.done.reject(err);
return;
}
item.done.resolve();
}
})()).then(() => duplex.end(), () => undefined);

this.currentWriter = [face, tx];
}

await cb(async (pkts) => {
const item = {
pkts,
done: pDefer<void>(),
};
const [, tx] = this.currentWriter!;
tx.push(item);
await item.done.promise;
});
this.currentWriter ??= this.makeStream("append") as NodeJS.WritableStream;
assert(isWritableStream(this.currentWriter), "stream is not Writable");

await cb((pkts) => pipeline(
() => pkts,
map((pkt) => Encoder.encode(pkt)),
writeToStream(this.currentWriter!),
));
});
}

Expand Down
4 changes: 0 additions & 4 deletions packages/repo-api/tests/data-tape.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ async function testBulkInsertTarget(
parallel: 8,
});
await bi.accept(makeDataTapeReadStream("read"));
let total = 0;
for (const [i, call] of storeInsert.mock.calls.entries()) {
console.log(`storeInsert ${i} ${call.length} ${total += call.length}`);
}
expect(storeInsert).toHaveBeenCalledTimes(16);

await tape.close();
Expand Down
17 changes: 17 additions & 0 deletions packages/repo-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,20 @@
This package is part of [NDNts](https://yoursunny.com/p/NDNts/), Named Data Networking libraries for the modern web.

**ndnts-repo** is a command line utility to run and benchmark `@ndn/repo` and other repo implementations.

## `ndnts-repo server`: Repo Server

```bash
# with NFD running, start the repo server
ndnts-repo server --store=/tmp/repo --rdr=true
```

## `ndnts-repo fillbi`: Fill Repo with Demo Data

```bash
# with repo server running, fill Data via bulk insertion
ndnts-repo fillbi --prefix=/repodemo --start=0 --count=32768

# retrieve one Data packet
ndnpeek /repodemo/seq=$RANDOM | ndn-dissect
```
Loading

0 comments on commit 15207f4

Please sign in to comment.