Skip to content

Commit

Permalink
Allow HTTP writer to handle binary data
Browse files Browse the repository at this point in the history
  • Loading branch information
julianrojas87 authored and ajuvercr committed Feb 6, 2024
1 parent 592d84b commit 0e4628c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/connectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export type ReaderConstructor<C extends Config> = (config: C) => {
};

export type WriterConstructor<C extends Config> = (config: C) => {
writer: Writer<string>;
writer: Writer<string | Buffer>;
init: () => Promise<void>;
};

Expand Down Expand Up @@ -131,7 +131,7 @@ export class ChannelFactory {
throw "Unknown reader channel " + config.ty.value;
}

createWriter(config: Config): Writer<string> {
createWriter(config: Config): Writer<string | Buffer> {
if (config.ty.equals(Conn.FileWriterChannel)) {
const { writer, init } = startFileStreamWriter(<FileWriterConfig>config);
this.inits.push(init);
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export const startHttpStreamWriter: WriterConstructor<HttpWriterConfig> = (
) => {
const requestConfig = <https.RequestOptions>new URL(config.endpoint);

const push = async (item: string): Promise<void> => {
const push = async (item: string | Buffer): Promise<void> => {
await new Promise((res) => {
const options = {
hostname: requestConfig.hostname,
Expand Down
39 changes: 38 additions & 1 deletion test/connectors/http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as conn from "../../src/connectors";
import { HttpReaderConfig, HttpWriterConfig } from "../../src/connectors/http";

describe("connector-http", () => {
test("Should write -> HTTP -> read", async () => {
test("Should write -> HTTP -> read (string)", async () => {
const readerConfig: HttpReaderConfig = {
endpoint: "localhost",
port: 8080,
Expand All @@ -19,6 +19,7 @@ describe("connector-http", () => {
const factory = new conn.ChannelFactory();
const reader = factory.createReader(readerConfig);
const writer = factory.createWriter(writerConfig);

reader.data((data) => {
items.push(data);
});
Expand All @@ -36,6 +37,42 @@ describe("connector-http", () => {

await Promise.all([reader.end(), writer.end()]);
});

test("Should write -> HTTP -> read (Buffer)", async () => {
const readerConfig: HttpReaderConfig = {
endpoint: "localhost",
port: 8081,
binary: true,
ty: conn.Conn.HttpReaderChannel,
};
const writerConfig: HttpWriterConfig = {
endpoint: "http://localhost:8081",
method: "POST",
ty: conn.Conn.HttpWriterChannel,
};

const factory = new conn.ChannelFactory();
const reader = factory.createReader(readerConfig);
const writer = factory.createWriter(writerConfig);

reader.data((data) => {
expect(Buffer.isBuffer(data)).toBeTruthy();
items.push(data.toString());
});

await factory.init();

const items: unknown[] = [];

await writer.push(Buffer.from("test1", "utf8"));
await sleep(200);
await writer.push(Buffer.from("test2", "utf8"));
await sleep(200);

expect(items).toEqual(["test1", "test2"]);

await Promise.all([reader.end(), writer.end()]);
});
});

function sleep(x: number): Promise<unknown> {
Expand Down

0 comments on commit 0e4628c

Please sign in to comment.