Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow HTTP channel reader to handle binary data #5

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions channels/http.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
sh:maxCount 1;
sh:datatype xsd:integer;
sh:description "Used port";
], [
sh:path :binary;
sh:name "binary";
sh:maxCount 1;
sh:datatype xsd:boolean;
sh:description "Stream raw bytes if true";
].

[ ] a sh:NodeShape;
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ajuvercr/js-runner",
"version": "0.1.12",
"version": "0.1.13",
"type": "module",
"description": "",
"main": "./dist/index.js",
Expand All @@ -12,7 +12,7 @@
],
"types": "./dist/index.d.ts",
"bin": {
"js-runner": "./bin/bundle.mjs"
"js-runner": "bin/bundle.mjs"
},
"scripts": {
"build": "tsc && bun build ./bin/js-runner.js --outfile bin/bundle.mjs --target node && npm run build:recompose",
Expand Down
8 changes: 4 additions & 4 deletions src/connectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ export interface Config {
}

export type ReaderConstructor<C extends Config> = (config: C) => {
reader: Stream<string>;
reader: Stream<string | Buffer>;
init: () => Promise<void>;
};

export type WriterConstructor<C extends Config> = (config: C) => {
writer: Writer<string>;
writer: Writer<string | Buffer>;
init: () => Promise<void>;
};

Expand All @@ -77,7 +77,7 @@ export class ChannelFactory {
private jsChannelsNamedNodes: { [label: string]: SimpleStream<string> } = {};
private jsChannelsBlankNodes: { [label: string]: SimpleStream<string> } = {};

createReader(config: Config): Stream<string> {
createReader(config: Config): Stream<string | Buffer> {
if (config.ty.equals(Conn.FileReaderChannel)) {
const { reader, init } = startFileStreamReader(<FileReaderConfig>config);
this.inits.push(init);
Expand Down Expand Up @@ -131,7 +131,7 @@ export class ChannelFactory {
throw "Unknown reader channel " + config.ty.value;
}

createWriter(config: Config): Writer<string> {
createWriter(config: Config): Writer<string | Buffer> {
if (config.ty.equals(Conn.FileWriterChannel)) {
const { writer, init } = startFileStreamWriter(<FileWriterConfig>config);
this.inits.push(init);
Expand Down
14 changes: 9 additions & 5 deletions src/connectors/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,31 @@ import {
WriterConstructor,
} from "../connectors";

function streamToString(stream: Readable): Promise<string> {
function streamToString(stream: Readable, binary: boolean): Promise<string | Buffer> {
const datas = <Buffer[]>[];
return new Promise((res) => {
stream.on("data", (data) => {
datas.push(data);
});
stream.on("end", () => res(Buffer.concat(datas).toString()));
stream.on("end", () => {
const streamData = Buffer.concat(datas);
res(binary ? streamData : streamData.toString())
});
});
}

export interface HttpReaderConfig extends Config {
endpoint: string;
port: number;
binary: boolean;
}

export const startHttpStreamReader: ReaderConstructor<HttpReaderConfig> = (
config,
) => {
let server: Server;

const stream = new SimpleStream<string>(
const stream = new SimpleStream<string | Buffer>(
() =>
new Promise((res) => {
const cb = (): void => res();
Expand All @@ -52,7 +56,7 @@ export const startHttpStreamReader: ReaderConstructor<HttpReaderConfig> = (
res: ServerResponse,
) {
try {
const content = await streamToString(req);
const content = await streamToString(req, config.binary);
stream.push(content).catch((error) => {
throw error;
});
Expand Down Expand Up @@ -89,7 +93,7 @@ export const startHttpStreamWriter: WriterConstructor<HttpWriterConfig> = (
) => {
const requestConfig = <https.RequestOptions>new URL(config.endpoint);

const push = async (item: string): Promise<void> => {
const push = async (item: string | Buffer): Promise<void> => {
await new Promise((res) => {
const options = {
hostname: requestConfig.hostname,
Expand Down
40 changes: 39 additions & 1 deletion test/connectors/http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import * as conn from "../../src/connectors";
import { HttpReaderConfig, HttpWriterConfig } from "../../src/connectors/http";

describe("connector-http", () => {
test("Should write -> HTTP -> read", async () => {
test("Should write -> HTTP -> read (string)", async () => {
const readerConfig: HttpReaderConfig = {
endpoint: "localhost",
port: 8080,
binary: false,
ty: conn.Conn.HttpReaderChannel,
};
const writerConfig: HttpWriterConfig = {
Expand All @@ -18,6 +19,7 @@ describe("connector-http", () => {
const factory = new conn.ChannelFactory();
const reader = factory.createReader(readerConfig);
const writer = factory.createWriter(writerConfig);

reader.data((data) => {
items.push(data);
});
Expand All @@ -35,6 +37,42 @@ describe("connector-http", () => {

await Promise.all([reader.end(), writer.end()]);
});

test("Should write -> HTTP -> read (Buffer)", async () => {
const readerConfig: HttpReaderConfig = {
endpoint: "localhost",
port: 8081,
binary: true,
ty: conn.Conn.HttpReaderChannel,
};
const writerConfig: HttpWriterConfig = {
endpoint: "http://localhost:8081",
method: "POST",
ty: conn.Conn.HttpWriterChannel,
};

const factory = new conn.ChannelFactory();
const reader = factory.createReader(readerConfig);
const writer = factory.createWriter(writerConfig);

reader.data((data) => {
expect(Buffer.isBuffer(data)).toBeTruthy();
items.push(data.toString());
});

await factory.init();

const items: unknown[] = [];

await writer.push(Buffer.from("test1", "utf8"));
await sleep(200);
await writer.push(Buffer.from("test2", "utf8"));
await sleep(200);

expect(items).toEqual(["test1", "test2"]);

await Promise.all([reader.end(), writer.end()]);
});
});

function sleep(x: number): Promise<unknown> {
Expand Down
Loading