Skip to content

Commit

Permalink
endpoint: stop using applyMixins
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Dec 20, 2023
1 parent f124a89 commit 14e35f1
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 170 deletions.
1 change: 0 additions & 1 deletion packages/endpoint/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@ndn/fw": "workspace:*",
"@ndn/packet": "workspace:*",
"@ndn/util": "workspace:*",
"applymixins": "^1.1.0",
"it-pushable": "^3.2.3",
"streaming-iterables": "^8.0.1",
"tslib": "^2.6.2"
Expand Down
139 changes: 67 additions & 72 deletions packages/endpoint/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CancelInterest, type Forwarder, FwPacket } from "@ndn/fw";
import { Data, Interest, type NameLike, type Verifier } from "@ndn/packet";
import { Data, Interest, type Verifier } from "@ndn/packet";
import { pushable } from "it-pushable";

import { makeRetxGenerator, type RetxPolicy } from "./retx";
Expand Down Expand Up @@ -41,86 +41,81 @@ export interface ConsumerContext extends Promise<Data> {
readonly nRetx: number;
}

/** Consumer functionality of Endpoint. */
export class EndpointConsumer {
declare public fw: Forwarder;
declare public opts: ConsumerOptions;

/** Consume a single piece of Data. */
public consume(interestInput: Interest | NameLike, opts: ConsumerOptions = {}): ConsumerContext {
const interest = interestInput instanceof Interest ? interestInput : new Interest(interestInput);
const {
export function makeConsumer(
fw: Forwarder,
interest: Interest,
{
describe = `consume(${interest.name})`,
signal,
modifyInterest,
retx,
verifier,
} = { ...this.opts, ...opts };
Interest.makeModifyFunc(modifyInterest)(interest);

let nRetx = -1;
const retxGen = makeRetxGenerator(retx)(interest.lifetime)[Symbol.iterator]();

const promise = new Promise<Data>((resolve, reject) => {
const rx = pushable<FwPacket>({ objectMode: true });

let timer: NodeJS.Timeout | number | undefined;
const cancelRetx = () => {
clearTimeout(timer);
timer = undefined;
};

const sendInterest = () => {
cancelRetx();
const { value, done } = retxGen.next();
if (!done) {
timer = setTimeout(sendInterest, value);
}
rx.push(FwPacket.create(interest));
++nRetx;
};

const onAbort = () => {
cancelRetx();
rx.push(new CancelInterest(interest));
};
signal?.addEventListener("abort", onAbort);

this.fw.addFace({
rx,
async tx(iterable) {
for await (const pkt of iterable) {
if (pkt.l3 instanceof Data) {
try {
await verifier?.verify(pkt.l3);
} catch (err: unknown) {
reject(new Error(`Data verify failed: ${err} @${describe}`));
break;
}
resolve(pkt.l3);
break;
}
if (pkt.reject && !timer) {
reject(new Error(`Interest rejected: ${pkt.reject} @${describe}`));
}: ConsumerOptions,
): ConsumerContext {
Interest.makeModifyFunc(modifyInterest)(interest);

let nRetx = -1;
const retxGen = makeRetxGenerator(retx)(interest.lifetime)[Symbol.iterator]();

const promise = new Promise<Data>((resolve, reject) => {
const rx = pushable<FwPacket>({ objectMode: true });

let timer: NodeJS.Timeout | number | undefined;
const cancelRetx = () => {
clearTimeout(timer);
timer = undefined;
};

const sendInterest = () => {
cancelRetx();
const { value, done } = retxGen.next();
if (!done) {
timer = setTimeout(sendInterest, value);
}
rx.push(FwPacket.create(interest));
++nRetx;
};

const onAbort = () => {
cancelRetx();
rx.push(new CancelInterest(interest));
};
signal?.addEventListener("abort", onAbort);

fw.addFace({
rx,
async tx(iterable) {
for await (const pkt of iterable) {
if (pkt.l3 instanceof Data) {
try {
await verifier?.verify(pkt.l3);
} catch (err: unknown) {
reject(new Error(`Data verify failed: ${err} @${describe}`));
break;
}
resolve(pkt.l3);
break;
}
if (pkt.reject && !timer) {
reject(new Error(`Interest rejected: ${pkt.reject} @${describe}`));
break;
}
cancelRetx();
signal?.removeEventListener("abort", onAbort);
rx.end();
},
}
cancelRetx();
signal?.removeEventListener("abort", onAbort);
rx.end();
},
{
describe,
local: true,
});

sendInterest();
},
{
describe,
local: true,
});

return Object.defineProperties(promise, {
interest: { value: interest },
nRetx: { get() { return nRetx; } },
}) as ConsumerContext;
}
sendInterest();
});

return Object.defineProperties(promise, {
interest: { value: interest },
nRetx: { get() { return nRetx; } },
}) as ConsumerContext;
}
38 changes: 31 additions & 7 deletions packages/endpoint/src/endpoint.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Forwarder } from "@ndn/fw";
import applyMixins from "applymixins";
import { Forwarder, type FwFace } from "@ndn/fw";
import { Interest, Name, type NameLike } from "@ndn/packet";

import { type ConsumerOptions, EndpointConsumer } from "./consumer";
import { EndpointProducer, type ProducerOptions } from "./producer";
import { type ConsumerContext, type ConsumerOptions, makeConsumer } from "./consumer";
import { type Producer, type ProducerHandler, ProducerImpl, type ProducerOptions } from "./producer";

export interface Options extends ConsumerOptions, ProducerOptions {
fw?: Forwarder;
Expand All @@ -18,13 +18,37 @@ export class Endpoint {
constructor(public readonly opts: Options = {}) {
this.fw = opts.fw ?? Forwarder.getDefault();
}

/**
* Retrieve a single piece of Data.
* @param interest Interest or Interest name.
*/
public consume(interest: Interest | NameLike, opts: ConsumerOptions = {}): ConsumerContext {
return makeConsumer(
this.fw,
interest instanceof Interest ? interest : new Interest(interest),
{ ...this.opts, ...opts },
);
}

/**
* Start a producer.
* @param prefix prefix registration; if undefined, prefixes may be added later.
* @param handler function to handle incoming Interest.
*/
public produce(prefix: NameLike | undefined, handler: ProducerHandler, opts: ProducerOptions = {}): Producer {
return new ProducerImpl(
this.fw,
prefix === undefined ? undefined : Name.from(prefix),
handler,
{ ...this.opts, ...opts },
);
}
}
export interface Endpoint extends EndpointConsumer, EndpointProducer {}
applyMixins(Endpoint, [EndpointConsumer, EndpointProducer]);

export namespace Endpoint {
/** Delete default Forwarder instance (mainly for unit testing). */
export const deleteDefaultForwarder = Forwarder.deleteDefault;

export type RouteAnnouncement = EndpointProducer.RouteAnnouncement;
export type RouteAnnouncement = FwFace.RouteAnnouncement;
}
Loading

0 comments on commit 14e35f1

Please sign in to comment.