Skip to content

Commit

Permalink
feat: Allow writer to be ended and error on writing to closed writer
Browse files Browse the repository at this point in the history
  • Loading branch information
smessie committed Aug 26, 2024
1 parent 5ed2cd7 commit f70ec13
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 20 deletions.
7 changes: 7 additions & 0 deletions src/connectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ export class ChannelFactory {
export interface Writer<T> {
push(item: T): Promise<void>;
end(): Promise<void>;
on(event: "end", listener: Handler<void>): this;
}

export interface Stream<T> {
Expand All @@ -242,6 +243,8 @@ export class SimpleStream<T> implements Stream<T> {
public readonly disconnect: () => Promise<void>;
public lastElement?: T | undefined;

private ended = false;

public constructor(onDisconnect?: () => Promise<void>) {
this.disconnect = onDisconnect || (async () => {});
}
Expand All @@ -252,13 +255,17 @@ export class SimpleStream<T> implements Stream<T> {
}

public async push(data: T): Promise<void> {
if (this.ended) {
throw new Error("Trying to push to a stream that has ended!");
}
this.lastElement = data;
await Promise.all(this.dataHandlers.map((handler) => handler(data)));
}

public async end(): Promise<void> {
await this.disconnect();
await Promise.all(this.endHandlers.map((handler) => handler()));
this.ended = true;
}

public on(event: "data", listener: Handler<T>): this;
Expand Down
8 changes: 4 additions & 4 deletions src/connectors/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ export const startFileStreamWriter: WriterConstructor<FileWriterConfig> = (
: `${process.cwd()}/${config.path}`;
const encoding: BufferEncoding = <BufferEncoding>config.encoding || "utf-8";

const writer = new SimpleStream<string>();

const init = async () => {
if (!config.onReplace) {
await writeFile(path, "", { encoding });
}
};

const push = async (item: string): Promise<void> => {
writer.push = async (item: string): Promise<void> => {
if (config.onReplace) {
await writeFile(path, item, { encoding });
} else {
await appendFile(path, item, { encoding });
}
};

const end = async (): Promise<void> => {};

return { writer: { push, end }, init };
return { writer, init };
};
8 changes: 4 additions & 4 deletions src/connectors/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ export const startHttpStreamWriter: WriterConstructor<HttpWriterConfig> = (
) => {
const requestConfig = <https.RequestOptions>new URL(config.endpoint);

const push = async (item: string | Buffer): Promise<void> => {
const writer = new SimpleStream<string | Buffer>();

writer.push = async (item: string | Buffer): Promise<void> => {
await new Promise((resolve) => {
const options = {
hostname: requestConfig.hostname,
Expand All @@ -125,7 +127,5 @@ export const startHttpStreamWriter: WriterConstructor<HttpWriterConfig> = (
});
};

const end = async (): Promise<void> => {};

return { writer: { push, end }, init: async () => {} };
return { writer, init: async () => {} };
};
13 changes: 7 additions & 6 deletions src/connectors/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,16 @@ export const startKafkaStreamWriter: WriterConstructor<KafkaWriterConfig> = (
const kafka = new Kafka(<KafkaConfig>brokerConfig);

const producer = kafka.producer(config.producer);

const writer = new SimpleStream<string>(async () => {
await producer.disconnect();
});

const init = () => producer.connect();

const push = async (item: string): Promise<void> => {
writer.push = async (item: string): Promise<void> => {
await producer.send({ topic, messages: [{ value: item }] });
};

const end = async (): Promise<void> => {
await producer.disconnect();
};

return { writer: { push, end }, init };
return { writer, init };
};
12 changes: 6 additions & 6 deletions src/connectors/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ export const startWsStreamWriter: WriterConstructor<WsWriterConfig> = (
ws.on("open", () => console.log("open"));
};

const writer = new SimpleStream<string>(async () => {
ws.close();
});

/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
const push = async (item: any): Promise<void> => {
writer.push = async (item: any): Promise<void> => {
ws.send(item);
};

const end = async (): Promise<void> => {
ws.close();
};

return { writer: { push, end }, init };
return { writer, init };
};

0 comments on commit f70ec13

Please sign in to comment.