Skip to content

Commit

Permalink
update rdf-lens
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Apr 8, 2024
1 parent a1be863 commit 9185b9d
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 846 deletions.
2 changes: 1 addition & 1 deletion input.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<jr> a js:JsReaderChannel.
<jw> a js:JsWriterChannel.
[ ] a js:Send;
js:msg "Hallo world";
js:msg "Hello";
js:sendWriter <jw>.

[ ] a js:Resc;
Expand Down
44 changes: 34 additions & 10 deletions ontology.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>.
@prefix dc: <http://purl.org/dc/terms/>.
@prefix rdfl: <https://w3id.org/rdf-lens/ontology#>.

[ ] a sh:NodeShape;
sh:targetClass :Channel;
sh:property [
sh:path ( );
sh:name "id";
sh:maxCount 1;
sh:minCount 1;
sh:datatype xsd:iri;
], [
sh:path :reader;
Expand All @@ -30,34 +32,56 @@
[ ] a sh:NodeShape;
sh:targetClass :ReaderChannel;
sh:property [
sh:path [ sh:inversePath :reader ];
sh:name "channel";
sh:maxCount 1;
sh:class :Channel;
], [
sh:path rdf:type;
sh:name "ty";
sh:maxCount 1;
sh:datatype xsd:iri;
], [
sh:path ( );
sh:name "config";
sh:maxCount 1;
sh:minCount 1;
sh:class rdfl:TypedExtract;
].

[ ] a sh:NodeShape;
sh:targetClass :WriterChannel;
sh:property [
sh:path [ sh:inversePath :writer ];
sh:name "channel";
sh:maxCount 1;
sh:class :Channel;
], [
sh:path rdf:type;
sh:name "ty";
sh:maxCount 1;
sh:minCount 1;
sh:datatype xsd:iri;
], [
sh:path ( );
sh:name "config";
sh:maxCount 1;
sh:minCount 1;
sh:class rdfl:TypedExtract;
].

js:JsChannel rdfs:subClassOf :Channel.
js:JsReaderChannel rdfs:subClassOf :ReaderChannel.
[ ] a sh:NodeShape;
sh:targetClass js:JsReaderChannel;
sh:property [
sh:path [ sh:inversePath :reader ];
sh:name "channel";
sh:maxCount 1;
sh:minCount 1;
sh:class :Channel;
].

js:JsWriterChannel rdfs:subClassOf :WriterChannel.
[ ] a sh:NodeShape;
sh:targetClass js:JsWriterChannel;
sh:property [
sh:path [ sh:inversePath :writer ];
sh:name "channel";
sh:maxCount 1;
sh:minCount 1;
sh:class :Channel;
].

#
js:JsChannel a :Channel;
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"command-line-usage": "^6.1.3",
"kafkajs": "^2.2.4",
"n3": "^1.17.1",
"rdf-lens": "^1.1.3",
"rdf-lens": "^1.2.3",
"stream-to-array": "^2.3.0",
"ws": "^8.14.2"
},
Expand Down
8 changes: 5 additions & 3 deletions processor/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ function streamToString(ev) {
export async function send(msg, writer) {
const host = "0.0.0.0";
const port = 8000;
const requestListener = function (req, res) {
streamToString(req).then((st) => writer.push(st.toString()));
const requestListener = async function (req, res) {
const data = await streamToString(req);
const ret = `${msg} ${data}`;
await writer.push(ret);
res.writeHead(200);
res.end(msg);
res.end(ret);
};
const server = http.createServer(requestListener);

Expand Down
45 changes: 27 additions & 18 deletions src/connectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { createTermNamespace } from "@treecg/types";

import { NamedNode, Term } from "@rdfjs/types";
import {
FileReaderConfig,
FileWriterConfig,
startFileStreamReader,
startFileStreamWriter,
Expand All @@ -12,7 +11,6 @@ export * from "./connectors/file";
import {
startWsStreamReader,
startWsStreamWriter,
WsReaderConfig,
WsWriterConfig,
} from "./connectors/ws";
export * from "./connectors/ws";
Expand Down Expand Up @@ -43,18 +41,21 @@ export const Conn = createTermNamespace(
"KafkaWriterChannel",
"WsReaderChannel",
"WsWriterChannel",
"WriterChannel",
"ReaderChannel",
);

export interface Config {
export interface Config<T> {
ty: NamedNode;
config: T;
}

export type ReaderConstructor<C extends Config> = (config: C) => {
export type ReaderConstructor<C> = (config: C) => {
reader: Stream<string | Buffer>;
init: () => Promise<void>;
};

export type WriterConstructor<C extends Config> = (config: C) => {
export type WriterConstructor<C> = (config: C) => {
writer: Writer<string | Buffer>;
init: () => Promise<void>;
};
Expand All @@ -67,7 +68,7 @@ export const JsOntology = createTermNamespace(
"JsWriterChannel",
);
type JsChannel = {
channel?: {
channel: {
id: Term;
};
};
Expand All @@ -77,37 +78,39 @@ export class ChannelFactory {
private jsChannelsNamedNodes: { [label: string]: SimpleStream<string> } = {};
private jsChannelsBlankNodes: { [label: string]: SimpleStream<string> } = {};

createReader(config: Config): Stream<string | Buffer> {
createReader(config: Config<any>): Stream<string | Buffer> {
if (config.ty.equals(Conn.FileReaderChannel)) {
const { reader, init } = startFileStreamReader(<FileReaderConfig>config);
const { reader, init } = startFileStreamReader(config.config);
this.inits.push(init);

return reader;
}

if (config.ty.equals(Conn.WsReaderChannel)) {
const { reader, init } = startWsStreamReader(<WsReaderConfig>config);
const { reader, init } = startWsStreamReader(config.config);
this.inits.push(init);

return reader;
}

if (config.ty.equals(Conn.KafkaReaderChannel)) {
const { reader, init } = startKafkaStreamReader(
<KafkaReaderConfig>config,
<KafkaReaderConfig>config.config,
);
this.inits.push(init);
return reader;
}

if (config.ty.equals(Conn.HttpReaderChannel)) {
const { reader, init } = startHttpStreamReader(<HttpReaderConfig>config);
const { reader, init } = startHttpStreamReader(
<HttpReaderConfig>config.config,
);
this.inits.push(init);
return reader;
}

if (config.ty.equals(JsOntology.JsReaderChannel)) {
const c = <JsChannel>config;
const c = <JsChannel>config.config;
if (c.channel) {
const id = c.channel.id.value;
if (c.channel.id.termType === "NamedNode") {
Expand All @@ -131,37 +134,43 @@ export class ChannelFactory {
throw "Unknown reader channel " + config.ty.value;
}

createWriter(config: Config): Writer<string | Buffer> {
createWriter(config: Config<any>): Writer<string | Buffer> {
if (config.ty.equals(Conn.FileWriterChannel)) {
const { writer, init } = startFileStreamWriter(<FileWriterConfig>config);
const { writer, init } = startFileStreamWriter(
<FileWriterConfig>config.config,
);
this.inits.push(init);

return writer;
}

if (config.ty.equals(Conn.WsWriterChannel)) {
const { writer, init } = startWsStreamWriter(<WsWriterConfig>config);
const { writer, init } = startWsStreamWriter(
<WsWriterConfig>config.config,
);
this.inits.push(init);

return writer;
}

if (config.ty.equals(Conn.KafkaWriterChannel)) {
const { writer, init } = startKafkaStreamWriter(
<KafkaWriterConfig>config,
<KafkaWriterConfig>config.config,
);
this.inits.push(init);
return writer;
}

if (config.ty.equals(Conn.HttpWriterChannel)) {
const { writer, init } = startHttpStreamWriter(<HttpWriterConfig>config);
const { writer, init } = startHttpStreamWriter(
<HttpWriterConfig>config.config,
);
this.inits.push(init);
return writer;
}

if (config.ty.equals(JsOntology.JsWriterChannel)) {
const c = <JsChannel>config;
const c = <JsChannel>config.config;
if (c.channel) {
const id = c.channel.id.value;
if (c.channel.id.termType === "NamedNode") {
Expand Down
6 changes: 2 additions & 4 deletions src/connectors/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { appendFile, readFile, stat, writeFile } from "fs/promises";
import { isAbsolute } from "path";
import { watch } from "node:fs";
import {
Config,
ReaderConstructor,
SimpleStream,
WriterConstructor,
Expand All @@ -13,14 +12,14 @@ interface FileError extends Error {
code: string;
}

export interface FileReaderConfig extends Config {
export interface FileReaderConfig {
path: string;
onReplace: boolean;
readFirstContent?: boolean;
encoding?: string;
}

export interface FileWriterConfig extends Config {
export interface FileWriterConfig {
path: string;
onReplace: boolean;
readFirstContent?: boolean;
Expand Down Expand Up @@ -99,7 +98,6 @@ export const startFileStreamReader: ReaderConstructor<FileReaderConfig> = (
);

if (config.onReplace && config.readFirstContent) {
console.log("reading first content");
const content = await readFile(path, { encoding });
await reader.push(content);
}
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function streamToString(
});
}

export interface HttpReaderConfig extends Config {
export interface HttpReaderConfig {
endpoint: string;
port: number;
binary: boolean;
Expand Down Expand Up @@ -92,7 +92,7 @@ export const startHttpStreamReader: ReaderConstructor<HttpReaderConfig> = (
return { reader: stream, init };
};

export interface HttpWriterConfig extends Config {
export interface HttpWriterConfig {
endpoint: string;
method: string;
}
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export interface CSTopic {
topic: string;
fromBeginning?: boolean;
}
export interface KafkaReaderConfig extends Config {
export interface KafkaReaderConfig {
topic: {
name: string;
fromBeginning?: boolean;
Expand Down Expand Up @@ -106,7 +106,7 @@ export const startKafkaStreamReader: ReaderConstructor<KafkaReaderConfig> = (
return { reader: stream, init };
};

export interface KafkaWriterConfig extends Config {
export interface KafkaWriterConfig {
topic: {
name: string;
};
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { RawData, WebSocket } from "ws";
import { WebSocketServer } from "ws";

export interface WsWriterConfig extends Config {
export interface WsWriterConfig {
url: string;
}

Expand All @@ -27,7 +27,7 @@ function connectWs(url: string): Promise<WebSocket> {
return new Promise((res) => _connectWs(url, res));
}

export interface WsReaderConfig extends Config {
export interface WsReaderConfig {
host: string;
port: number;
}
Expand Down
Loading

0 comments on commit 9185b9d

Please sign in to comment.