diff --git a/src/connectors.ts b/src/connectors.ts index 4503890..4aa9f2d 100644 --- a/src/connectors.ts +++ b/src/connectors.ts @@ -55,7 +55,7 @@ export type ReaderConstructor = (config: C) => { }; export type WriterConstructor = (config: C) => { - writer: Writer; + writer: Writer; init: () => Promise; }; @@ -131,7 +131,7 @@ export class ChannelFactory { throw "Unknown reader channel " + config.ty.value; } - createWriter(config: Config): Writer { + createWriter(config: Config): Writer { if (config.ty.equals(Conn.FileWriterChannel)) { const { writer, init } = startFileStreamWriter(config); this.inits.push(init); diff --git a/src/connectors/http.ts b/src/connectors/http.ts index 62ddac3..9753041 100644 --- a/src/connectors/http.ts +++ b/src/connectors/http.ts @@ -93,7 +93,7 @@ export const startHttpStreamWriter: WriterConstructor = ( ) => { const requestConfig = new URL(config.endpoint); - const push = async (item: string): Promise => { + const push = async (item: string | Buffer): Promise => { await new Promise((res) => { const options = { hostname: requestConfig.hostname, diff --git a/test/connectors/http.test.ts b/test/connectors/http.test.ts index fcb010a..1d09d2a 100644 --- a/test/connectors/http.test.ts +++ b/test/connectors/http.test.ts @@ -3,7 +3,7 @@ import * as conn from "../../src/connectors"; import { HttpReaderConfig, HttpWriterConfig } from "../../src/connectors/http"; describe("connector-http", () => { - test("Should write -> HTTP -> read", async () => { + test("Should write -> HTTP -> read (string)", async () => { const readerConfig: HttpReaderConfig = { endpoint: "localhost", port: 8080, @@ -19,6 +19,7 @@ describe("connector-http", () => { const factory = new conn.ChannelFactory(); const reader = factory.createReader(readerConfig); const writer = factory.createWriter(writerConfig); + reader.data((data) => { items.push(data); }); @@ -36,6 +37,42 @@ describe("connector-http", () => { await Promise.all([reader.end(), writer.end()]); }); + + test("Should write -> HTTP -> read (Buffer)", async () => { + const readerConfig: HttpReaderConfig = { + endpoint: "localhost", + port: 8081, + binary: true, + ty: conn.Conn.HttpReaderChannel, + }; + const writerConfig: HttpWriterConfig = { + endpoint: "http://localhost:8081", + method: "POST", + ty: conn.Conn.HttpWriterChannel, + }; + + const factory = new conn.ChannelFactory(); + const reader = factory.createReader(readerConfig); + const writer = factory.createWriter(writerConfig); + + reader.data((data) => { + expect(Buffer.isBuffer(data)).toBeTruthy(); + items.push(data.toString()); + }); + + await factory.init(); + + const items: unknown[] = []; + + await writer.push(Buffer.from("test1", "utf8")); + await sleep(200); + await writer.push(Buffer.from("test2", "utf8")); + await sleep(200); + + expect(items).toEqual(["test1", "test2"]); + + await Promise.all([reader.end(), writer.end()]); + }); }); function sleep(x: number): Promise {