From f70ec13cf02ea6482f7410c9859dd7d7eb1946d7 Mon Sep 17 00:00:00 2001 From: smessie Date: Mon, 26 Aug 2024 09:01:52 +0200 Subject: [PATCH] feat: Allow writer to be ended and error on writing to closed writer --- src/connectors.ts | 7 +++++++ src/connectors/file.ts | 8 ++++---- src/connectors/http.ts | 8 ++++---- src/connectors/kafka.ts | 13 +++++++------ src/connectors/ws.ts | 12 ++++++------ 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/connectors.ts b/src/connectors.ts index f47099e..b7ae6f6 100644 --- a/src/connectors.ts +++ b/src/connectors.ts @@ -223,6 +223,7 @@ export class ChannelFactory { export interface Writer { push(item: T): Promise; end(): Promise; + on(event: "end", listener: Handler): this; } export interface Stream { @@ -242,6 +243,8 @@ export class SimpleStream implements Stream { public readonly disconnect: () => Promise; public lastElement?: T | undefined; + private ended = false; + public constructor(onDisconnect?: () => Promise) { this.disconnect = onDisconnect || (async () => {}); } @@ -252,6 +255,9 @@ export class SimpleStream implements Stream { } public async push(data: T): Promise { + if (this.ended) { + throw new Error("Trying to push to a stream that has ended!"); + } this.lastElement = data; await Promise.all(this.dataHandlers.map((handler) => handler(data))); } @@ -259,6 +265,7 @@ export class SimpleStream implements Stream { public async end(): Promise { await this.disconnect(); await Promise.all(this.endHandlers.map((handler) => handler())); + this.ended = true; } public on(event: "data", listener: Handler): this; diff --git a/src/connectors/file.ts b/src/connectors/file.ts index 2217f1a..7be4110 100644 --- a/src/connectors/file.ts +++ b/src/connectors/file.ts @@ -121,13 +121,15 @@ export const startFileStreamWriter: WriterConstructor = ( : `${process.cwd()}/${config.path}`; const encoding: BufferEncoding = config.encoding || "utf-8"; + const writer = new SimpleStream(); + const init = async () => { if (!config.onReplace) { await writeFile(path, "", { encoding }); } }; - const push = async (item: string): Promise => { + writer.push = async (item: string): Promise => { if (config.onReplace) { await writeFile(path, item, { encoding }); } else { @@ -135,7 +137,5 @@ export const startFileStreamWriter: WriterConstructor = ( } }; - const end = async (): Promise => {}; - - return { writer: { push, end }, init }; + return { writer, init }; }; diff --git a/src/connectors/http.ts b/src/connectors/http.ts index 96b4874..c0a2111 100644 --- a/src/connectors/http.ts +++ b/src/connectors/http.ts @@ -101,7 +101,9 @@ export const startHttpStreamWriter: WriterConstructor = ( ) => { const requestConfig = new URL(config.endpoint); - const push = async (item: string | Buffer): Promise => { + const writer = new SimpleStream(); + + writer.push = async (item: string | Buffer): Promise => { await new Promise((resolve) => { const options = { hostname: requestConfig.hostname, @@ -125,7 +127,5 @@ export const startHttpStreamWriter: WriterConstructor = ( }); }; - const end = async (): Promise => {}; - - return { writer: { push, end }, init: async () => {} }; + return { writer, init: async () => {} }; }; diff --git a/src/connectors/kafka.ts b/src/connectors/kafka.ts index 4e2762d..2ac13b4 100644 --- a/src/connectors/kafka.ts +++ b/src/connectors/kafka.ts @@ -138,15 +138,16 @@ export const startKafkaStreamWriter: WriterConstructor = ( const kafka = new Kafka(brokerConfig); const producer = kafka.producer(config.producer); + + const writer = new SimpleStream(async () => { + await producer.disconnect(); + }); + const init = () => producer.connect(); - const push = async (item: string): Promise => { + writer.push = async (item: string): Promise => { await producer.send({ topic, messages: [{ value: item }] }); }; - const end = async (): Promise => { - await producer.disconnect(); - }; - - return { writer: { push, end }, init }; + return { writer, init }; }; diff --git a/src/connectors/ws.ts b/src/connectors/ws.ts index cd7fbe4..cff627d 100644 --- a/src/connectors/ws.ts +++ b/src/connectors/ws.ts @@ -94,14 +94,14 @@ export const startWsStreamWriter: WriterConstructor = ( ws.on("open", () => console.log("open")); }; + const writer = new SimpleStream(async () => { + ws.close(); + }); + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ - const push = async (item: any): Promise => { + writer.push = async (item: any): Promise => { ws.send(item); }; - const end = async (): Promise => { - ws.close(); - }; - - return { writer: { push, end }, init }; + return { writer, init }; };