diff --git a/build.gradle.kts b/build.gradle.kts index 2ae991d..a85355a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -91,6 +91,10 @@ dependencies { // TOML parser. implementation("org.tomlj:tomlj:1.1.1") + // Arrow functional library. + implementation("io.arrow-kt:arrow-core:1.2.4") + implementation("io.arrow-kt:arrow-fx-coroutines:1.2.4") + // gRPC implementation("io.grpc:grpc-netty:1.64.0") implementation("io.grpc:grpc-protobuf:1.64.0") @@ -108,6 +112,7 @@ dependencies { // RDF dependencies. implementation("org.apache.jena:apache-jena-libs:5.0.0") implementation("org.apache.jena:jena-arq:5.0.0") + implementation("org.apache.jena:jena-shacl:5.0.0") // Hide SLF4J warnings. implementation("org.slf4j:slf4j-nop:2.0.7") diff --git a/proto/intermediate.proto b/proto/intermediate.proto index ea08161..5e01899 100644 --- a/proto/intermediate.proto +++ b/proto/intermediate.proto @@ -23,10 +23,17 @@ enum IRParameterCount { LIST = 1; } +message IRParameters { + map parameters = 1; +} + message IRParameter { - IRParameterType type = 1; - IRParameterPresence presence = 2; - IRParameterCount count = 3; + oneof type { + IRParameterType simple = 1; + IRParameters complex = 2; + } + IRParameterPresence presence = 3; + IRParameterCount count = 4; } message IRProcessor { @@ -35,9 +42,23 @@ message IRProcessor { map metadata = 3; } +message IRArgumentSimple { + repeated string value = 1; +} + +message IRArgumentMap { + map arguments = 1; +} + +message IRArgumentComplex { + repeated IRArgumentMap value = 1; +} + message IRArgument { - IRParameter parameter = 1; - repeated string value = 2; + oneof value { + IRArgumentSimple simple = 1; + IRArgumentComplex complex = 2; + } } message IRStage { diff --git a/runners/nodejs/src/proto/intermediate.ts b/runners/nodejs/src/proto/intermediate.ts index 2788eeb..c83fd89 100644 --- a/runners/nodejs/src/proto/intermediate.ts +++ b/runners/nodejs/src/proto/intermediate.ts @@ -156,8 +156,18 @@ export function iRParameterCountToJSON(object: IRParameterCount): string { } } +export interface IRParameters { + parameters: { [key: string]: IRParameter }; +} + +export interface IRParameters_ParametersEntry { + key: string; + value: IRParameter | undefined; +} + export interface IRParameter { - type: IRParameterType; + simple?: IRParameterType | undefined; + complex?: IRParameters | undefined; presence: IRParameterPresence; count: IRParameterCount; } @@ -178,11 +188,28 @@ export interface IRProcessor_MetadataEntry { value: string; } -export interface IRArgument { - parameter: IRParameter | undefined; +export interface IRArgumentSimple { value: string[]; } +export interface IRArgumentMap { + arguments: { [key: string]: IRArgument }; +} + +export interface IRArgumentMap_ArgumentsEntry { + key: string; + value: IRArgument | undefined; +} + +export interface IRArgumentComplex { + value: IRArgumentMap[]; +} + +export interface IRArgument { + simple?: IRArgumentSimple | undefined; + complex?: IRArgumentComplex | undefined; +} + export interface IRStage { uri: string; processor: IRProcessor | undefined; @@ -194,8 +221,194 @@ export interface IRStage_ArgumentsEntry { value: IRArgument | undefined; } +function createBaseIRParameters(): IRParameters { + return { parameters: {} }; +} + +export const IRParameters = { + encode( + message: IRParameters, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + Object.entries(message.parameters).forEach(([key, value]) => { + IRParameters_ParametersEntry.encode( + { key: key as any, value }, + writer.uint32(10).fork(), + ).ldelim(); + }); + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRParameters { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRParameters(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + const entry1 = IRParameters_ParametersEntry.decode( + reader, + reader.uint32(), + ); + if (entry1.value !== undefined) { + message.parameters[entry1.key] = entry1.value; + } + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRParameters { + return { + parameters: isObject(object.parameters) + ? Object.entries(object.parameters).reduce<{ + [key: string]: IRParameter; + }>((acc, [key, value]) => { + acc[key] = IRParameter.fromJSON(value); + return acc; + }, {}) + : {}, + }; + }, + + toJSON(message: IRParameters): unknown { + const obj: any = {}; + if (message.parameters) { + const entries = Object.entries(message.parameters); + if (entries.length > 0) { + obj.parameters = {}; + entries.forEach(([k, v]) => { + obj.parameters[k] = IRParameter.toJSON(v); + }); + } + } + return obj; + }, + + create, I>>( + base?: I, + ): IRParameters { + return IRParameters.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRParameters { + const message = createBaseIRParameters(); + message.parameters = Object.entries(object.parameters ?? {}).reduce<{ + [key: string]: IRParameter; + }>((acc, [key, value]) => { + if (value !== undefined) { + acc[key] = IRParameter.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseIRParameters_ParametersEntry(): IRParameters_ParametersEntry { + return { key: "", value: undefined }; +} + +export const IRParameters_ParametersEntry = { + encode( + message: IRParameters_ParametersEntry, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + IRParameter.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number, + ): IRParameters_ParametersEntry { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRParameters_ParametersEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = IRParameter.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRParameters_ParametersEntry { + return { + key: isSet(object.key) ? globalThis.String(object.key) : "", + value: isSet(object.value) + ? IRParameter.fromJSON(object.value) + : undefined, + }; + }, + + toJSON(message: IRParameters_ParametersEntry): unknown { + const obj: any = {}; + if (message.key !== "") { + obj.key = message.key; + } + if (message.value !== undefined) { + obj.value = IRParameter.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): IRParameters_ParametersEntry { + return IRParameters_ParametersEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRParameters_ParametersEntry { + const message = createBaseIRParameters_ParametersEntry(); + message.key = object.key ?? ""; + message.value = + object.value !== undefined && object.value !== null + ? IRParameter.fromPartial(object.value) + : undefined; + return message; + }, +}; + function createBaseIRParameter(): IRParameter { - return { type: 0, presence: 0, count: 0 }; + return { simple: undefined, complex: undefined, presence: 0, count: 0 }; } export const IRParameter = { @@ -203,14 +416,17 @@ export const IRParameter = { message: IRParameter, writer: _m0.Writer = _m0.Writer.create(), ): _m0.Writer { - if (message.type !== 0) { - writer.uint32(8).int32(message.type); + if (message.simple !== undefined) { + writer.uint32(8).int32(message.simple); + } + if (message.complex !== undefined) { + IRParameters.encode(message.complex, writer.uint32(18).fork()).ldelim(); } if (message.presence !== 0) { - writer.uint32(16).int32(message.presence); + writer.uint32(24).int32(message.presence); } if (message.count !== 0) { - writer.uint32(24).int32(message.count); + writer.uint32(32).int32(message.count); } return writer; }, @@ -228,20 +444,27 @@ export const IRParameter = { break; } - message.type = reader.int32() as any; + message.simple = reader.int32() as any; continue; case 2: - if (tag !== 16) { + if (tag !== 18) { break; } - message.presence = reader.int32() as any; + message.complex = IRParameters.decode(reader, reader.uint32()); continue; case 3: if (tag !== 24) { break; } + message.presence = reader.int32() as any; + continue; + case 4: + if (tag !== 32) { + break; + } + message.count = reader.int32() as any; continue; } @@ -255,7 +478,12 @@ export const IRParameter = { fromJSON(object: any): IRParameter { return { - type: isSet(object.type) ? iRParameterTypeFromJSON(object.type) : 0, + simple: isSet(object.simple) + ? iRParameterTypeFromJSON(object.simple) + : undefined, + complex: isSet(object.complex) + ? IRParameters.fromJSON(object.complex) + : undefined, presence: isSet(object.presence) ? iRParameterPresenceFromJSON(object.presence) : 0, @@ -265,8 +493,11 @@ export const IRParameter = { toJSON(message: IRParameter): unknown { const obj: any = {}; - if (message.type !== 0) { - obj.type = iRParameterTypeToJSON(message.type); + if (message.simple !== undefined) { + obj.simple = iRParameterTypeToJSON(message.simple); + } + if (message.complex !== undefined) { + obj.complex = IRParameters.toJSON(message.complex); } if (message.presence !== 0) { obj.presence = iRParameterPresenceToJSON(message.presence); @@ -284,7 +515,11 @@ export const IRParameter = { object: I, ): IRParameter { const message = createBaseIRParameter(); - message.type = object.type ?? 0; + message.simple = object.simple ?? undefined; + message.complex = + object.complex !== undefined && object.complex !== null + ? IRParameters.fromPartial(object.complex) + : undefined; message.presence = object.presence ?? 0; message.count = object.count ?? 0; return message; @@ -620,8 +855,333 @@ export const IRProcessor_MetadataEntry = { }, }; +function createBaseIRArgumentSimple(): IRArgumentSimple { + return { value: [] }; +} + +export const IRArgumentSimple = { + encode( + message: IRArgumentSimple, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + for (const v of message.value) { + writer.uint32(10).string(v!); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRArgumentSimple { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRArgumentSimple(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.value.push(reader.string()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRArgumentSimple { + return { + value: globalThis.Array.isArray(object?.value) + ? object.value.map((e: any) => globalThis.String(e)) + : [], + }; + }, + + toJSON(message: IRArgumentSimple): unknown { + const obj: any = {}; + if (message.value?.length) { + obj.value = message.value; + } + return obj; + }, + + create, I>>( + base?: I, + ): IRArgumentSimple { + return IRArgumentSimple.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRArgumentSimple { + const message = createBaseIRArgumentSimple(); + message.value = object.value?.map((e) => e) || []; + return message; + }, +}; + +function createBaseIRArgumentMap(): IRArgumentMap { + return { arguments: {} }; +} + +export const IRArgumentMap = { + encode( + message: IRArgumentMap, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + Object.entries(message.arguments).forEach(([key, value]) => { + IRArgumentMap_ArgumentsEntry.encode( + { key: key as any, value }, + writer.uint32(10).fork(), + ).ldelim(); + }); + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRArgumentMap { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRArgumentMap(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + const entry1 = IRArgumentMap_ArgumentsEntry.decode( + reader, + reader.uint32(), + ); + if (entry1.value !== undefined) { + message.arguments[entry1.key] = entry1.value; + } + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRArgumentMap { + return { + arguments: isObject(object.arguments) + ? Object.entries(object.arguments).reduce<{ + [key: string]: IRArgument; + }>((acc, [key, value]) => { + acc[key] = IRArgument.fromJSON(value); + return acc; + }, {}) + : {}, + }; + }, + + toJSON(message: IRArgumentMap): unknown { + const obj: any = {}; + if (message.arguments) { + const entries = Object.entries(message.arguments); + if (entries.length > 0) { + obj.arguments = {}; + entries.forEach(([k, v]) => { + obj.arguments[k] = IRArgument.toJSON(v); + }); + } + } + return obj; + }, + + create, I>>( + base?: I, + ): IRArgumentMap { + return IRArgumentMap.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRArgumentMap { + const message = createBaseIRArgumentMap(); + message.arguments = Object.entries(object.arguments ?? {}).reduce<{ + [key: string]: IRArgument; + }>((acc, [key, value]) => { + if (value !== undefined) { + acc[key] = IRArgument.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseIRArgumentMap_ArgumentsEntry(): IRArgumentMap_ArgumentsEntry { + return { key: "", value: undefined }; +} + +export const IRArgumentMap_ArgumentsEntry = { + encode( + message: IRArgumentMap_ArgumentsEntry, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + IRArgument.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number, + ): IRArgumentMap_ArgumentsEntry { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRArgumentMap_ArgumentsEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = IRArgument.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRArgumentMap_ArgumentsEntry { + return { + key: isSet(object.key) ? globalThis.String(object.key) : "", + value: isSet(object.value) + ? IRArgument.fromJSON(object.value) + : undefined, + }; + }, + + toJSON(message: IRArgumentMap_ArgumentsEntry): unknown { + const obj: any = {}; + if (message.key !== "") { + obj.key = message.key; + } + if (message.value !== undefined) { + obj.value = IRArgument.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): IRArgumentMap_ArgumentsEntry { + return IRArgumentMap_ArgumentsEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRArgumentMap_ArgumentsEntry { + const message = createBaseIRArgumentMap_ArgumentsEntry(); + message.key = object.key ?? ""; + message.value = + object.value !== undefined && object.value !== null + ? IRArgument.fromPartial(object.value) + : undefined; + return message; + }, +}; + +function createBaseIRArgumentComplex(): IRArgumentComplex { + return { value: [] }; +} + +export const IRArgumentComplex = { + encode( + message: IRArgumentComplex, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + for (const v of message.value) { + IRArgumentMap.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRArgumentComplex { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRArgumentComplex(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.value.push(IRArgumentMap.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRArgumentComplex { + return { + value: globalThis.Array.isArray(object?.value) + ? object.value.map((e: any) => IRArgumentMap.fromJSON(e)) + : [], + }; + }, + + toJSON(message: IRArgumentComplex): unknown { + const obj: any = {}; + if (message.value?.length) { + obj.value = message.value.map((e) => IRArgumentMap.toJSON(e)); + } + return obj; + }, + + create, I>>( + base?: I, + ): IRArgumentComplex { + return IRArgumentComplex.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRArgumentComplex { + const message = createBaseIRArgumentComplex(); + message.value = + object.value?.map((e) => IRArgumentMap.fromPartial(e)) || []; + return message; + }, +}; + function createBaseIRArgument(): IRArgument { - return { parameter: undefined, value: [] }; + return { simple: undefined, complex: undefined }; } export const IRArgument = { @@ -629,11 +1189,17 @@ export const IRArgument = { message: IRArgument, writer: _m0.Writer = _m0.Writer.create(), ): _m0.Writer { - if (message.parameter !== undefined) { - IRParameter.encode(message.parameter, writer.uint32(10).fork()).ldelim(); + if (message.simple !== undefined) { + IRArgumentSimple.encode( + message.simple, + writer.uint32(10).fork(), + ).ldelim(); } - for (const v of message.value) { - writer.uint32(18).string(v!); + if (message.complex !== undefined) { + IRArgumentComplex.encode( + message.complex, + writer.uint32(18).fork(), + ).ldelim(); } return writer; }, @@ -651,14 +1217,14 @@ export const IRArgument = { break; } - message.parameter = IRParameter.decode(reader, reader.uint32()); + message.simple = IRArgumentSimple.decode(reader, reader.uint32()); continue; case 2: if (tag !== 18) { break; } - message.value.push(reader.string()); + message.complex = IRArgumentComplex.decode(reader, reader.uint32()); continue; } if ((tag & 7) === 4 || tag === 0) { @@ -671,22 +1237,22 @@ export const IRArgument = { fromJSON(object: any): IRArgument { return { - parameter: isSet(object.parameter) - ? IRParameter.fromJSON(object.parameter) + simple: isSet(object.simple) + ? IRArgumentSimple.fromJSON(object.simple) + : undefined, + complex: isSet(object.complex) + ? IRArgumentComplex.fromJSON(object.complex) : undefined, - value: globalThis.Array.isArray(object?.value) - ? object.value.map((e: any) => globalThis.String(e)) - : [], }; }, toJSON(message: IRArgument): unknown { const obj: any = {}; - if (message.parameter !== undefined) { - obj.parameter = IRParameter.toJSON(message.parameter); + if (message.simple !== undefined) { + obj.simple = IRArgumentSimple.toJSON(message.simple); } - if (message.value?.length) { - obj.value = message.value; + if (message.complex !== undefined) { + obj.complex = IRArgumentComplex.toJSON(message.complex); } return obj; }, @@ -698,11 +1264,14 @@ export const IRArgument = { object: I, ): IRArgument { const message = createBaseIRArgument(); - message.parameter = - object.parameter !== undefined && object.parameter !== null - ? IRParameter.fromPartial(object.parameter) + message.simple = + object.simple !== undefined && object.simple !== null + ? IRArgumentSimple.fromPartial(object.simple) + : undefined; + message.complex = + object.complex !== undefined && object.complex !== null + ? IRArgumentComplex.fromPartial(object.complex) : undefined; - message.value = object.value?.map((e) => e) || []; return message; }, }; diff --git a/runners/nodejs/src/runtime/runner.ts b/runners/nodejs/src/runtime/runner.ts index 77a2e71..41f663c 100644 --- a/runners/nodejs/src/runtime/runner.ts +++ b/runners/nodejs/src/runtime/runner.ts @@ -1,4 +1,11 @@ -import { IRParameterType, IRStage } from "../proto/intermediate"; +import { + IRArgument, + IRParameter, + IRParameterCount, + IRParameterPresence, + IRParameterType, + IRStage, +} from "../proto/intermediate"; import { ChannelData } from "../proto"; import { Subject, Subscription } from "rxjs"; import { Processor } from "../interfaces/processor"; @@ -28,6 +35,85 @@ export class Runner { }); } + parseArgumentSimple(arg: IRParameterType, value: string): unknown { + if (arg == IRParameterType.STRING) { + return value; + } else if (arg == IRParameterType.INT) { + return Number.parseInt(value); + } else if (arg == IRParameterType.FLOAT) { + return Number.parseFloat(value); + } else if (arg == IRParameterType.BOOLEAN) { + return value == "true"; + } else if (arg == IRParameterType.READER) { + const subject = new Subject(); + this.readers.set(value, subject); + return new Reader(subject); + } else if (arg == IRParameterType.WRITER) { + const subject = new Subject(); + subject.subscribe((data) => { + this.outgoing.next({ + destinationUri: value, + data: data, + }); + }); + return new Writer(subject); + } else { + throw new Error("Invalid argument type"); + } + } + + parseArgument(arg: IRArgument, param: IRParameter): unknown[] { + if (arg.complex && param.complex) { + const params: Map = new Map( + Object.entries(param.complex.parameters), + ); + return arg.complex.value.map((map) => { + const args: Map = new Map( + Object.entries(map.arguments), + ); + return this.parseArguments(args, params); + }); + } + + if (arg.simple && param.simple) { + return arg.simple.value.map((value) => + this.parseArgumentSimple(param.simple!, value), + ); + } + + throw new Error("Invalid argument type"); + } + + parseArguments( + args: Map, + params: Map, + ): Map { + const result = new Map(); + + for (const [name, arg] of args) { + const param = params.get(name)!; + const parsed = this.parseArgument(arg, param); + + if (param.count == IRParameterCount.SINGLE) { + if (parsed.length > 1) { + throw new Error(`Too many arguments for ${name}`); + } + + result.set(name, parsed[0]); + } else { + result.set(name, parsed); + } + } + + for (const [name, param] of params) { + if (param.presence == IRParameterPresence.REQUIRED && !result.has(name)) { + throw new Error(`Missing required argument ${name}`); + } + } + + return result; + } + async load(stage: IRStage): Promise { /** Load the processor into Node.js */ const absolutePath = path.resolve(stage.processor!.metadata.import); @@ -35,32 +121,12 @@ export class Runner { const constructor = processor.default; /** Parse the stage's arguments. */ - const args: Map = new Map(); - Object.entries(stage.arguments).map(([name, arg]) => { - if (arg.parameter!.type == IRParameterType.READER) { - const subject = new Subject(); - const reader = new Reader(subject); - this.readers.set(arg.value[0], subject); - args.set(name, reader); - } else if (arg.parameter!.type == IRParameterType.WRITER) { - const subject = new Subject(); - subject.subscribe((data) => { - this.outgoing.next({ - destinationUri: arg.value[0], - data: data, - }); - }); - const writer = new Writer(subject); - args.set(name, writer); - } else { - console.error( - new Error(`Unsupported parameter type ${arg.parameter!.type}`), - ); - } - }); + const args = new Map(Object.entries(stage.arguments)); + const params = new Map(Object.entries(stage.processor!.parameters!)); + const parsedArguments = this.parseArguments(args, params); try { - const processorInstance = new constructor(args); + const processorInstance = new constructor(parsedArguments); this.stages.set(stage.uri, processorInstance); } catch (e) { console.error(e); @@ -81,10 +147,5 @@ export class Runner { }); } - async halt(): Promise { - console.log("Halting stages."); - this.incomingSubscription.unsubscribe(); - } - static shared = new Runner(); } diff --git a/src/main/kotlin/Orchestrator.kt b/src/main/kotlin/Orchestrator.kt index 7b218a6..df96374 100644 --- a/src/main/kotlin/Orchestrator.kt +++ b/src/main/kotlin/Orchestrator.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.withTimeout import runner.Runner import runner.impl.NodeRunner import runner.jvm.JVMRunner -import technology.idlab.parser.intermediate.IRParameter import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log @@ -35,10 +34,7 @@ class Orchestrator(stages: Set) { runner.load(stage) // Find all the readers in the stage. - stage.arguments - .filter { it.value.parameter.type == IRParameter.Type.READER } - .mapValues { (_, argument) -> argument.value[0] } - .forEach { (_, uri) -> this.readers[uri] = runner } + stage.getReaders().forEach { this.readers[it] = runner } } /** Execute all stages in all the runtimes. */ diff --git a/src/main/kotlin/parser/Parser.kt b/src/main/kotlin/parser/Parser.kt index 57c91dc..f3033c5 100644 --- a/src/main/kotlin/parser/Parser.kt +++ b/src/main/kotlin/parser/Parser.kt @@ -1,7 +1,7 @@ package technology.idlab.parser import java.io.File -import technology.idlab.parser.impl.TomlParser +import technology.idlab.parser.impl.RDFParser import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log @@ -22,7 +22,7 @@ abstract class Parser { /* Create a parser based on the file extension. */ fun create(file: File): Parser { return when (file.extension) { - "toml" -> TomlParser(file) + "ttl" -> RDFParser(file) else -> Log.shared.fatal("Unsupported file extension: ${file.extension}") } } diff --git a/src/main/kotlin/parser/impl/RDFParser.kt b/src/main/kotlin/parser/impl/RDFParser.kt index 7d78ba5..c1f21d9 100644 --- a/src/main/kotlin/parser/impl/RDFParser.kt +++ b/src/main/kotlin/parser/impl/RDFParser.kt @@ -3,8 +3,14 @@ package technology.idlab.parser.impl import java.io.File import org.apache.jena.rdf.model.Model import org.apache.jena.rdf.model.ModelFactory +import org.apache.jena.rdf.model.Property +import org.apache.jena.rdf.model.RDFNode +import org.apache.jena.rdf.model.Resource +import org.apache.jena.rdf.model.ResourceFactory.createProperty +import org.apache.jena.rdf.model.ResourceFactory.createResource +import org.apache.jena.shacl.vocabulary.SHACLM +import org.apache.jena.vocabulary.RDF import runner.Runner -import technology.idlab.extensions.query import technology.idlab.extensions.validate import technology.idlab.parser.Parser import technology.idlab.parser.intermediate.IRArgument @@ -13,150 +19,250 @@ import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log -class RDFParser(file: File) : Parser() { - /* The pipeline config contains additional SHACL shapes. */ - private val config = File(this.javaClass.getResource("/pipeline.ttl")!!.toURI()) +private class CONN { + companion object { + const val NS = "https://www.rdf-connect.com/#" + val NAMESPACE = createResource(NS)!! + val processor = createProperty("${NS}Processor")!! + val stage = createProperty("${NS}stage")!! + val channel = createProperty("${NS}Channel")!! + val target = createProperty("${NS}target")!! + val metadata = createProperty("${NS}metadata")!! + val arguments = createProperty("${NS}arguments")!! + val kotlinRunner = createResource("${NS}Kotlin")!! + } +} - /* Parse the RDF file into an Apache Jena model. */ - private val model: Model = - ModelFactory.createDefaultModel() - .read(config.inputStream(), null, "TURTLE") - .read(file.inputStream(), null, "TURTLE") - .validate() +private fun Resource.toRunnerTarget(): Runner.Target { + return when (this) { + CONN.kotlinRunner -> Runner.Target.JVM + else -> Log.shared.fatal("Unknown runner type: $this") + } +} - /* Cache the processors. */ - private val processors: Map +/** + * Maps a resource to an IRParameter.Type based on the URI. Note that this implementation is + * actually quite slow, and we should probably use Apache Jena native APIs here. + */ +private fun Resource.toIRParameterType(): IRParameter.Type { + return when (this.uri) { + "http://www.w3.org/2001/XMLSchema#boolean" -> IRParameter.Type.BOOLEAN + "http://www.w3.org/2001/XMLSchema#byte" -> IRParameter.Type.BYTE + "http://www.w3.org/2001/XMLSchema#datetime" -> IRParameter.Type.DATE + "http://www.w3.org/2001/XMLSchema#double" -> IRParameter.Type.DOUBLE + "http://www.w3.org/2001/XMLSchema#float" -> IRParameter.Type.FLOAT + "http://www.w3.org/2001/XMLSchema#int" -> IRParameter.Type.INT + "http://www.w3.org/2001/XMLSchema#long" -> IRParameter.Type.LONG + "http://www.w3.org/2001/XMLSchema#string" -> IRParameter.Type.STRING + "http://www.rdf-connect.com/#/writer" -> IRParameter.Type.WRITER + "http://www.rdf-connect.com/#/reader" -> IRParameter.Type.READER + else -> Log.shared.fatal("Unknown datatype: ${this.uri}") + } +} - /* Cache stages as well. */ - private val stages: Map - - init { - val processors = mutableListOf() - - model.query("/queries/processors.sparql") { - // Get URI - val uri = it.get("uri").asResource().toString() - - // Get target. - val targetString = it.get("target").asLiteral().string - val target = Runner.Target.fromString(targetString) - - // Parse parameters. - val parameterBindings = mapOf("?processor" to uri) - val parameters = mutableMapOf() - model.query("/queries/parameters.sparql", parameterBindings) { query -> - val path = query.get("path").asResource().toString().substringAfterLast("/") - val datatype = query.get("datatype").asResource().toString() - val minCount = - try { - query.get("minCount").asLiteral().int - } catch (e: Exception) { - null - } - val maxCount = - try { - query.get("maxCount").asLiteral().int - } catch (e: Exception) { - null - } - - // Check if the parameter is required or optional. - val presence = - if (minCount != null && minCount > 0) { - IRParameter.Presence.REQUIRED - } else { - IRParameter.Presence.OPTIONAL - } - - // Check if the parameter is a list. - val count = - if (maxCount != null && maxCount == 1) { - IRParameter.Count.SINGLE - } else { - IRParameter.Count.LIST - } - - // Parse the datatype. - // TODO: Check whether or not this gets compiled to a performant data structure, and not - // a series of if-else statements, since that would have time complexity O(n^2). - val type = - when (datatype) { - "http://www.w3.org/2001/XMLSchema#boolean" -> IRParameter.Type.BOOLEAN - "http://www.w3.org/2001/XMLSchema#byte" -> IRParameter.Type.BYTE - "http://www.w3.org/2001/XMLSchema#datetime" -> IRParameter.Type.DATE - "http://www.w3.org/2001/XMLSchema#double" -> IRParameter.Type.DOUBLE - "http://www.w3.org/2001/XMLSchema#float" -> IRParameter.Type.FLOAT - "http://www.w3.org/2001/XMLSchema#int" -> IRParameter.Type.INT - "http://www.w3.org/2001/XMLSchema#long" -> IRParameter.Type.LONG - "http://www.w3.org/2001/XMLSchema#string" -> IRParameter.Type.STRING - "http://www.rdf-connect.com/#/writer" -> IRParameter.Type.WRITER - "http://www.rdf-connect.com/#/reader" -> IRParameter.Type.READER - else -> Log.shared.fatal("Unknown datatype: $datatype") - } - - val parameter = IRParameter(type, presence, count) - parameters[path] = parameter +/** + * Return the first object which corresponds to a subject and predicate. Returns null if not found. + */ +private fun Model.objectOfProperty(resource: Resource, property: Property): RDFNode? { + return try { + this.listObjectsOfProperty(resource, property).next() + } catch (e: NoSuchElementException) { + null + } +} + +/** + * Return the first subject which corresponds to a predicate and object. Returns null if not found. + */ +private fun Model.subjectWithProperty(property: Property, obj: RDFNode): Resource? { + return try { + this.listSubjectsWithProperty(property, obj).next() + } catch (e: NoSuchElementException) { + null + } +} + +/** + * Create a mapping of String to IRParameter from a SHACL property. This is a recursive + * implementation that will automatically parse nested classes. + */ +private fun Model.parseSHACLProperty(property: Resource): Pair { + // Retrieve required fields. + val minCount = objectOfProperty(property, SHACLM.minCount)?.asLiteral()?.int + val maxCount = objectOfProperty(property, SHACLM.maxCount)?.asLiteral()?.int + val node = objectOfProperty(property, SHACLM.node)?.asResource() + val datatype = objectOfProperty(property, SHACLM.datatype)?.asResource() + + // Retrieve the path of the property. + val path = + try { + objectOfProperty(property, SHACLM.name)!!.asLiteral().string + } catch (e: Exception) { + Log.shared.fatal("SHACL property must have a name.") } - // Parse metadata. - val metadataBuilder = mutableMapOf() - val metadataBindings = mapOf("?processor" to uri) - model.query("/queries/metadata.sparql", metadataBindings) { query -> - val key = query.get("key").asResource().toString().substringAfterLast("#") - val value = query.get("value").asLiteral().string - metadataBuilder[key] = value + // Determine the presence. + val presence = + if (minCount != null && minCount > 0) { + IRParameter.Presence.REQUIRED + } else { + IRParameter.Presence.OPTIONAL } - val metadata = metadataBuilder.toMap() - // Append as result. - val processor = IRProcessor(uri, target, parameters, metadata) - processors.add(processor) - } + // Determine the count. + val count = + if (maxCount != null && maxCount == 1) { + IRParameter.Count.SINGLE + } else { + IRParameter.Count.LIST + } - this.processors = processors.associateBy { it.uri } + // Create a new parameter object. + val parameter = + if (datatype != null) { + IRParameter(simple = datatype.toIRParameterType(), presence = presence, count = count) + } else if (node != null) { + IRParameter(complex = parseSHACLShape(node), presence = presence, count = count) + } else { + Log.shared.fatal("SHACL property must have either a datatype or a class.") + } + + // Return the parameter mapped to its path. + return Pair(path, parameter) +} + +/** + * Parse a SHACL shape into a mapping of String to IRParameter. This is a recursive implementation + * that will automatically parse nested classes. + */ +private fun Model.parseSHACLShape(shape: Resource): Map { + val result = mutableMapOf() + + for (property in listObjectsOfProperty(shape, SHACLM.property)) { + val (key, parameter) = parseSHACLProperty(property.asResource()) + result[key] = parameter } - init { - val stages = mutableListOf() - - model.query("/queries/stages.sparql") { - // Get URI - val uri = it.get("uri").asResource().toString() - - // Get processor target. - val processorURI = it.get("processor").asResource().toString() - val processor = - processors[processorURI] ?: Log.shared.fatal("Unknown processor: $processorURI") - - // Parse arguments. - val builder = mutableMapOf>() - val bindings = mapOf("?stage" to uri) - model.query("/queries/arguments.sparql", bindings) { query -> - val key = query.get("key").asResource().toString().substringAfterLast("/") - val value = query.get("value").asLiteral().string - val args = builder.getOrPut(key) { mutableListOf() } - args.add(value) - } + return result +} - val arguments = - builder.mapValues { (key, value) -> - val parameter = processor.parameters[key]!! - return@mapValues IRArgument(parameter, value) - } +private fun Model.nameOfSHACLPath(path: Resource): String { + val property = + subjectWithProperty(SHACLM.path, path) + ?: Log.shared.fatal("No property found for path: $path") + return objectOfProperty(property, SHACLM.name)?.asLiteral()?.string + ?: Log.shared.fatal("No name found for path: $path") +} + +/** + * Parse the arguments of a stage. This is a recursive implementation that will automatically parse + * nested classes. Recursion will continue until all objects found are literals. + */ +private fun Model.parseArguments(node: Resource): Map { + val simple = mutableMapOf>() + val complex = mutableMapOf>>() + + // Go over each triple of the resource. If it is a literal, add it to the simple list. Otherwise, + // call recursively and add it to the complex list. + for (triple in listStatements(node, null, null as RDFNode?)) { + val key = nameOfSHACLPath(triple.predicate) + val value = triple.`object` - // Append as result. - val stage = IRStage(uri, processor, arguments) - stages.add(stage) + if (value.isLiteral) { + val list = simple.getOrPut(key) { mutableListOf() } + list.add(value.asLiteral().string) + } else if (value.isResource) { + val list = complex.getOrPut(key) { mutableListOf() } + val nested = parseArguments(value.asResource()) + list.add(nested) + } else { + Log.shared.fatal("Unknown RDFNode type: $value") } + } - this.stages = stages.associateBy { it.uri } + // Combine both simple and complex mappings as a single map to IRArguments. + return simple.mapValues { (_, value) -> IRArgument(simple = value) } + + complex.mapValues { (_, value) -> IRArgument(complex = value) } +} + +private fun Model.parseProcessor(processor: Resource): IRProcessor { + val uri = processor.toString() + + // Determine the target runner. + val target = objectOfProperty(processor, CONN.target)!!.asResource().toRunnerTarget() + + // Parse the parameters by SHACL shape. + val shape = + subjectWithProperty(SHACLM.targetClass, processor) + ?: Log.shared.fatal("No shape found for processor: ${processor}") + val parameters = + listObjectsOfProperty(shape, SHACLM.property) + .toList() + .find { + val path = objectOfProperty(it.asResource(), SHACLM.path)?.asResource() + return@find path == CONN.arguments + } + ?.let { objectOfProperty(it.asResource(), SHACLM.node)?.asResource() } + ?.let { parseSHACLShape(it) } + ?: Log.shared.fatal("No argument shape found for processor: $processor") + + // Parse metadata. + val metadata = mutableMapOf() + for (entry in this.listObjectsOfProperty(processor, CONN.metadata)) { + val literal = + try { + entry.asLiteral().string + } catch (e: Exception) { + Log.shared.fatal("Metadata must be a literal.") + } + val (key, value) = literal.split(':') + metadata[key.trim()] = value.trim() } + return IRProcessor(uri, target, parameters, metadata) +} + +private fun Model.parseProcessors(): List { + return listSubjectsWithProperty(RDF.type, CONN.processor).toList().map { parseProcessor(it) } +} + +private fun Model.parseStages(): List { + return listSubjectsWithProperty(RDF.type, CONN.processor) + .toList() + .map { processor -> + listSubjectsWithProperty(RDF.type, processor).toList().map { stage -> + val arguments = + objectOfProperty(stage, CONN.arguments)?.asResource() + ?: Log.shared.fatal("No arguments found for stage: $stage") + + IRStage(stage.uri, parseProcessor(processor), parseArguments(arguments)) + } + } + .flatten() +} + +class RDFParser(file: File) : Parser() { + /* The pipeline config contains additional SHACL shapes. */ + private val config = File(this.javaClass.getResource("/pipeline.ttl")!!.toURI()) + + /* Parse the RDF file into an Apache Jena model. */ + private val model: Model = + ModelFactory.createDefaultModel() + .read(config.inputStream(), null, "TURTLE") + .read(file.inputStream(), null, "TURTLE") + .validate() + + /* Cache the processors. */ + private val processors: List = model.parseProcessors() + + /* Cache stages as well. */ + private val stages: List = model.parseStages() + override fun processors(): List { - return processors.values.toList() + return processors } override fun stages(): List { - return stages.values.toList() + return stages } } diff --git a/src/main/kotlin/parser/impl/TomlParser.kt b/src/main/kotlin/parser/impl/TomlParser.kt deleted file mode 100644 index 2e15ccd..0000000 --- a/src/main/kotlin/parser/impl/TomlParser.kt +++ /dev/null @@ -1,168 +0,0 @@ -package technology.idlab.parser.impl - -import java.io.File -import org.tomlj.Toml -import org.tomlj.TomlParseResult -import org.tomlj.TomlTable -import runner.Runner -import technology.idlab.parser.Parser -import technology.idlab.parser.intermediate.IRArgument -import technology.idlab.parser.intermediate.IRParameter -import technology.idlab.parser.intermediate.IRProcessor -import technology.idlab.parser.intermediate.IRStage -import technology.idlab.util.Log - -private fun TomlTable.toIRParameter(): IRParameter { - val typeString = this.getString("type") ?: Log.shared.fatal("No type found for parameter.") - val presenceString = - this.getString("presence") ?: Log.shared.fatal("No presence found for parameter.") - val countString = this.getString("count") ?: Log.shared.fatal("No count found for parameter.") - - // Parse type. - val type = - when (typeString) { - "string" -> IRParameter.Type.STRING - "integer" -> IRParameter.Type.INT - "float" -> IRParameter.Type.FLOAT - "writer" -> IRParameter.Type.WRITER - "reader" -> IRParameter.Type.READER - "date" -> IRParameter.Type.DATE - "boolean" -> IRParameter.Type.BOOLEAN - else -> Log.shared.fatal("Unknown type: $typeString") - } - - // Parse presence. - val presence = - when (presenceString) { - "required" -> IRParameter.Presence.REQUIRED - "optional" -> IRParameter.Presence.OPTIONAL - else -> Log.shared.fatal("Unknown presence: $presenceString") - } - - // Parse count. - val count = - when (countString) { - "single" -> IRParameter.Count.SINGLE - "list" -> IRParameter.Count.LIST - else -> Log.shared.fatal("Unknown count: $countString") - } - - return IRParameter(type, presence, count) -} - -private fun TomlTable.toIRArguments(parameters: Map): Map { - val results = mutableMapOf() - - this.keySet().forEach { name -> - if (this.isArray(name)) { - val valuesArray = this.getArray(name)!! - val values = mutableListOf() - - for (i in 0 until valuesArray.size()) { - val value = valuesArray.get(i).toString() - values.add(value) - } - - results[name] = IRArgument(parameters[name]!!, values) - } else { - val value = this.get(name).toString() - results[name] = IRArgument(parameters[name]!!, listOf(value)) - } - } - - return results -} - -/** - * Deserialize a TOML table into an IRProcessor. - * - * @param uri The URI of the processor, which needs to be explicitly provided since it is the key to - * the processor in the TOML table. - */ -private fun TomlTable.toIRProcessor(uri: String): IRProcessor { - // Get runner target. - val targetString = this.getString("target") ?: Log.shared.fatal("No target found for processor.") - val target = Runner.Target.fromString(targetString) - - // Check if any parameters are provided. - if (!this.contains("parameters")) { - return IRProcessor(uri, target) - } - - // Parse the parameters. - val parametersArray = this.getArray("parameters")!! - val result = mutableMapOf() - for (i in 0 until parametersArray.size()) { - val table = parametersArray.getTable(i) - val name = table.getString("name") ?: Log.shared.fatal("No name found for parameter.") - val parameter = parametersArray.getTable(i).toIRParameter() - result[name] = parameter - } - - // Parse metadata. - val metadata = mutableMapOf() - this.keySet().forEach { - if (it != "parameters" && it != "target") { - metadata[it] = this.getString(it) ?: Log.shared.fatal("No value found for metadata key: $it") - } - } - - return IRProcessor(uri, target, result, metadata) -} - -private fun TomlTable.toIRStage(processors: Map, uri: String): IRStage { - // Get processor. - val processorURI = - this.getString("processor") ?: Log.shared.fatal("No processor found for stage.") - val processor = processors[processorURI] ?: Log.shared.fatal("Unknown processor: $processorURI") - - // Parse arguments. - val arguments = this.getTable("arguments")?.toIRArguments(processor.parameters) ?: emptyMap() - return IRStage(uri, processor, arguments) -} - -class TomlParser(file: File) : Parser() { - /* Parse the TOML file directly and only once. */ - private var toml: TomlParseResult = Toml.parse(file.inputStream()) - - /* Save the processors in a handy map. */ - private val processors: Map - - /* Save stages as well. */ - private val stages: Map - - init { - // Get the processors table. - val processorsTable = toml.getTable("processors") - - // Iterate over the processor names and save them. - val processors = - processorsTable?.keySet()?.map { uri -> - val table = processorsTable.getTable(uri)!! - return@map table.toIRProcessor(uri) - } - - this.processors = processors?.associateBy { it.uri } ?: emptyMap() - } - - init { - val stagesTable = toml.getTable("stages") - - // Iterate over the processor names and save them. - val stages = - stagesTable?.keySet()?.map { uri -> - val table = stagesTable.getTable(uri)!! - return@map table.toIRStage(processors, uri) - } - - this.stages = stages?.associateBy { it.uri } ?: emptyMap() - } - - override fun processors(): List { - return processors.values.toList() - } - - override fun stages(): List { - return stages.values.toList() - } -} diff --git a/src/main/kotlin/parser/intermediate/IRArgument.kt b/src/main/kotlin/parser/intermediate/IRArgument.kt index d8c316c..b224a0a 100644 --- a/src/main/kotlin/parser/intermediate/IRArgument.kt +++ b/src/main/kotlin/parser/intermediate/IRArgument.kt @@ -1,8 +1,41 @@ package technology.idlab.parser.intermediate +import technology.idlab.util.Log + +/** + * Intermediate representation of an argument. These can be either simple or complex, meaning they + * contain either a list of values or a map of key-value pairs. Note that it doesn't matter whether + * the presence of an argument is required or optional, as well as whether it is a single value or a + * list of values. This is because the parser should have already taken care of these details. + */ data class IRArgument( - // Parameter. - val parameter: IRParameter, - // Concrete but unparsed value. - val value: List, -) + // In case of simple: concrete but unparsed value. + private val simple: List? = null, + // In case of complex: list of key-value pairs. + private val complex: List>? = null, +) { + enum class Kind { + SIMPLE, + COMPLEX, + } + + val kind = if (simple != null) Kind.SIMPLE else Kind.COMPLEX + + init { + if (simple == null && complex == null) { + Log.shared.fatal("IRArgument has no values.") + } + + if (simple != null && complex != null) { + Log.shared.fatal("IRArgument has both simple and complex values.") + } + } + + fun getSimple(): List { + return simple ?: Log.shared.fatal("IRArgument is not simple.") + } + + fun getComplex(): List> { + return complex ?: Log.shared.fatal("IRArgument is not complex.") + } +} diff --git a/src/main/kotlin/parser/intermediate/IRParameter.kt b/src/main/kotlin/parser/intermediate/IRParameter.kt index f799312..a3e35d9 100644 --- a/src/main/kotlin/parser/intermediate/IRParameter.kt +++ b/src/main/kotlin/parser/intermediate/IRParameter.kt @@ -1,13 +1,22 @@ package technology.idlab.parser.intermediate +import technology.idlab.util.Log + data class IRParameter( - // The type of the argument. - val type: Type, + // In case of simple: concrete but unparsed value. + private val simple: Type? = null, + // In case of complex: list of key-value pairs. + private val complex: Map? = null, // Whether the argument is required. val presence: Presence, // Whether the argument is a single value or a list of values. val count: Count, ) { + enum class Kind { + SIMPLE, + COMPLEX, + } + /* The data type of the argument. */ enum class Type { BOOLEAN, @@ -35,4 +44,28 @@ data class IRParameter( SINGLE, LIST, } + + val kind = if (simple != null) Kind.SIMPLE else Kind.COMPLEX + + init { + if (simple == null && complex == null) { + Log.shared.fatal("IRParameter has no values.") + } + + if (simple != null && complex != null) { + Log.shared.fatal("IRParameter has both simple and complex values.") + } + } + + fun getSimple(): Type { + return simple ?: Log.shared.fatal("IRParameter is not simple.") + } + + fun getComplex(): Map { + return complex ?: Log.shared.fatal("IRParameter is not complex.") + } + + operator fun get(key: String): IRParameter { + return complex?.get(key) ?: Log.shared.fatal("IRParameter is not complex.") + } } diff --git a/src/main/kotlin/parser/intermediate/IRStage.kt b/src/main/kotlin/parser/intermediate/IRStage.kt index 2784216..e6de6f9 100644 --- a/src/main/kotlin/parser/intermediate/IRStage.kt +++ b/src/main/kotlin/parser/intermediate/IRStage.kt @@ -1,5 +1,31 @@ package technology.idlab.parser.intermediate +import arrow.core.zip + +private fun getReaders(options: Map>): List { + val results = mutableListOf() + + options.values.forEach { (parameter, arguments) -> + when (parameter.kind) { + IRParameter.Kind.SIMPLE -> { + if (parameter.getSimple() == IRParameter.Type.READER) { + val uri = arguments.getSimple().first() + results.add(uri) + } + } + IRParameter.Kind.COMPLEX -> { + val nestedParams = parameter.getComplex() + arguments.getComplex().forEach { argument -> + val nestedOptions = getReaders(nestedParams.zip(argument)) + results.addAll(nestedOptions) + } + } + } + } + + return results +} + data class IRStage( // The URI of the stage. val uri: String, @@ -7,4 +33,8 @@ data class IRStage( val processor: IRProcessor, // Concrete but unparsed arguments for the stage. val arguments: Map = emptyMap(), -) +) { + fun getReaders(): List { + return getReaders(processor.parameters.zip(arguments)) + } +} diff --git a/src/main/kotlin/runner/impl/GRPCRunner.kt b/src/main/kotlin/runner/impl/GRPCRunner.kt index dc0b5d1..5da2362 100644 --- a/src/main/kotlin/runner/impl/GRPCRunner.kt +++ b/src/main/kotlin/runner/impl/GRPCRunner.kt @@ -50,9 +50,33 @@ private fun IRParameter.Count.toGRPC(): GRPC.IRParameterCount { } } +private fun Map.toGRPC(): GRPC.IRParameters { + return GRPC.IRParameters.newBuilder().putAllParameters(mapValues { it.value.toGRPC() }).build() +} + +private fun List.toGRPC(): GRPC.IRArgumentSimple { + return GRPC.IRArgumentSimple.newBuilder().addAllValue(this).build() +} + +private fun List>.toGRPC(): GRPC.IRArgumentComplex { + val arguments = map { it.toGRPC() } + val builder = GRPC.IRArgumentComplex.newBuilder() + builder.addAllValue(arguments) + return builder.build() +} + +private fun Map.toGRPC(): GRPC.IRArgumentMap { + val builder = GRPC.IRArgumentMap.newBuilder() + forEach { (key, value) -> builder.putArguments(key, value.toGRPC()) } + return builder.build() +} + private fun IRParameter.toGRPC(): GRPC.IRParameter { val builder = GRPC.IRParameter.newBuilder() - builder.setType(type.toGRPC()) + when (kind) { + IRParameter.Kind.SIMPLE -> builder.setSimple(getSimple().toGRPC()) + IRParameter.Kind.COMPLEX -> builder.setComplex(getComplex().toGRPC()) + } builder.setPresence(presence.toGRPC()) builder.setCount(count.toGRPC()) return builder.build() @@ -60,8 +84,12 @@ private fun IRParameter.toGRPC(): GRPC.IRParameter { private fun IRArgument.toGRPC(): GRPC.IRArgument { val builder = GRPC.IRArgument.newBuilder() - builder.setParameter(this.parameter.toGRPC()) - builder.addAllValue(this.value) + when (kind) { + IRArgument.Kind.SIMPLE -> builder.setSimple(getSimple().toGRPC()) + IRArgument.Kind.COMPLEX -> { + builder.setComplex(getComplex().toGRPC()) + } + } return builder.build() } diff --git a/src/main/kotlin/runner/jvm/JVMRunner.kt b/src/main/kotlin/runner/jvm/JVMRunner.kt index 371af69..97f5745 100644 --- a/src/main/kotlin/runner/jvm/JVMRunner.kt +++ b/src/main/kotlin/runner/jvm/JVMRunner.kt @@ -1,5 +1,6 @@ package runner.jvm +import arrow.core.zip import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel @@ -8,11 +9,12 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import org.jetbrains.kotlin.backend.common.push -import org.jetbrains.kotlin.utils.addToStdlib.ifFalse import runner.Runner +import technology.idlab.parser.intermediate.IRArgument import technology.idlab.parser.intermediate.IRParameter import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log +import technology.idlab.util.Log.Cause.* class JVMRunner( fromProcessors: Channel, @@ -28,8 +30,7 @@ class JVMRunner( override suspend fun load(stage: IRStage) { /** Load the class into the JVM> */ - val className = - stage.processor.metadata["class"] ?: Log.shared.fatal("The processor has no class key set.") + val className = stage.processor.metadata["class"] ?: Log.shared.fatal(JVM_RUNNER_STAGE_NO_CLASS) val clazz = Class.forName(className) as Class<*> /** Check if instantiatable. */ @@ -38,30 +39,7 @@ class JVMRunner( } /** Build the argument map. */ - val arguments = mutableMapOf() - - for ((name, arg) in stage.arguments) { - /** Create concrete instances. */ - val concrete = arg.value.map { instantiate(arg.parameter.type, it) } - - /** - * If an array is expected, simply pass the value directly. Otherwise, pass the first - * variable. - */ - if (arg.parameter.count == IRParameter.Count.LIST) { - arguments[name] = concrete - } else { - assert(concrete.size == 1) - assert(arg.parameter.count == IRParameter.Count.SINGLE) - arguments[name] = concrete[0] - } - } - - /** Check if the non-optional arguments were set. */ - stage.processor.parameters - .filter { it.value.presence == IRParameter.Presence.REQUIRED } - .all { it.key in arguments.keys } - .ifFalse { Log.shared.fatal("Required argument not set.") } + val arguments = this.instantiate(stage.processor.parameters.zip(stage.arguments)) /** Initialize the stage with the new map. */ val constructor = clazz.getConstructor(Map::class.java) @@ -106,6 +84,23 @@ class JVMRunner( jobs.map { it.apply { it.cancel() } }.forEach { it.join() } } + private fun instantiate( + serialized: Map> + ): Map { + return serialized.mapValues { (_, map) -> + val (parameter, arguments) = map + + when (parameter.kind) { + IRParameter.Kind.SIMPLE -> { + arguments.getSimple().map { instantiate(parameter.getSimple(), it) } + } + IRParameter.Kind.COMPLEX -> { + arguments.getComplex().map { instantiate(parameter.getComplex().zip(it)) } + } + } + } + } + private fun instantiate(type: IRParameter.Type, value: String): Any { return when (type) { IRParameter.Type.BOOLEAN -> value.toBoolean() diff --git a/src/main/kotlin/runner/jvm/Processor.kt b/src/main/kotlin/runner/jvm/Processor.kt index 5b29c40..6afa58b 100644 --- a/src/main/kotlin/runner/jvm/Processor.kt +++ b/src/main/kotlin/runner/jvm/Processor.kt @@ -17,16 +17,17 @@ abstract class Processor( @JvmField protected val log = Log.shared fun getArgument(name: String): T { - val result = arguments[name] as T ?: log.fatal("Argument $name is missing") - return result + val result = arguments.get(name) as List ?: log.fatal("Argument $name is missing") + return result.get(0) ?: log.fatal("Argument $name is missing") } fun getNullableArgument(name: String): T? { - return arguments[name] as T? + val result = arguments.get(name) as List? + return result?.get(0) } fun getOptionalArgument(name: String): Optional { - val result = arguments[name] ?: log.fatal("Argument $name is missing") + val result = (arguments.get(name) as List?)?.get(0) ?: log.fatal("Argument $name is missing") if (result is Optional<*>) { return result as Optional diff --git a/src/main/kotlin/util/Log.kt b/src/main/kotlin/util/Log.kt index c23e57a..d7318f5 100644 --- a/src/main/kotlin/util/Log.kt +++ b/src/main/kotlin/util/Log.kt @@ -7,6 +7,10 @@ import kotlin.Exception import technology.idlab.exception.RunnerException class Log private constructor() { + enum class Cause(val message: String) { + JVM_RUNNER_STAGE_NO_CLASS("The processor has no class key set."), + } + enum class Level { INFO, SEVERE, @@ -104,6 +108,8 @@ class Log private constructor() { throw RunnerException() } + fun fatal(cause: Cause): Nothing = fatal(cause.message) + fun debug(message: String) { toConsole(message, Level.DEBUG) } diff --git a/src/main/resources/queries/arguments.sparql b/src/main/resources/queries/arguments.sparql deleted file mode 100644 index 9ebb5b8..0000000 --- a/src/main/resources/queries/arguments.sparql +++ /dev/null @@ -1,12 +0,0 @@ -PREFIX conn: -PREFIX rdf: -PREFIX sh: - -SELECT - ?key - ?value - -WHERE { - ?stage ?key ?value. - FILTER (rdf:type != ?key) -} diff --git a/src/main/resources/queries/metadata.sparql b/src/main/resources/queries/metadata.sparql deleted file mode 100644 index b639a22..0000000 --- a/src/main/resources/queries/metadata.sparql +++ /dev/null @@ -1,14 +0,0 @@ -PREFIX conn: -PREFIX rdf: - -SELECT - ?key - ?value - -WHERE { - ?processor a conn:Processor. - ?processor ?key ?value. - - FILTER(?key != rdf:type) - FILTER(?key != conn:target) -} diff --git a/src/main/resources/queries/parameters.sparql b/src/main/resources/queries/parameters.sparql deleted file mode 100644 index e93a345..0000000 --- a/src/main/resources/queries/parameters.sparql +++ /dev/null @@ -1,19 +0,0 @@ -PREFIX conn: -PREFIX sh: - -SELECT - ?path - ?datatype - ?minCount - ?maxCount - -WHERE { - ?shape sh:targetClass ?processor. - ?shape sh:property ?property. - - ?property sh:datatype ?datatype. - ?property sh:path ?path. - -OPTIONAL { ?property sh:minCount ?minCount. } - OPTIONAL { ?property sh:maxCount ?maxCount. } -} diff --git a/src/main/resources/queries/processors.sparql b/src/main/resources/queries/processors.sparql deleted file mode 100644 index 6990c42..0000000 --- a/src/main/resources/queries/processors.sparql +++ /dev/null @@ -1,10 +0,0 @@ -PREFIX conn: - -SELECT - ?uri - ?target - -WHERE { - ?uri a conn:Processor. - ?uri conn:target ?target. -} diff --git a/src/main/resources/queries/stages.sparql b/src/main/resources/queries/stages.sparql deleted file mode 100644 index 654dbc9..0000000 --- a/src/main/resources/queries/stages.sparql +++ /dev/null @@ -1,10 +0,0 @@ -PREFIX conn: - -SELECT - ?uri - ?processor - -WHERE { - ?uri a ?processor. - ?processor a conn:Processor. -} diff --git a/src/test/kotlin/parser/ParserTest.kt b/src/test/kotlin/parser/ParserTest.kt index 27c8b2c..01813db 100644 --- a/src/test/kotlin/parser/ParserTest.kt +++ b/src/test/kotlin/parser/ParserTest.kt @@ -4,7 +4,6 @@ import kotlin.test.Test import kotlin.test.assertContains import kotlin.test.assertEquals import kotlin.test.assertNotNull -import kotlin.test.assertNull import kotlin.test.assertTrue import runner.Runner import technology.idlab.parser.Parser @@ -17,51 +16,95 @@ abstract class ParserTest { @Test fun processors() { val processors = parser.processors() - assertEquals(2, processors.size, "There should be two processors.") + assertEquals(1, processors.size, "There should be one processor.") - // Get alpha processor and check its values. - val alpha = processors.find { it.uri.endsWith("alpha") } - assertNotNull(alpha, "Alpha processor should exist.") - assertContains(alpha.metadata.keys, "class", "Alpha processor should have a class key.") + // Get processor and check its values. + val processor = processors.find { it.uri.endsWith("processor") } + assertNotNull(processor, "Processor should exist.") + assertContains(processor.metadata.keys, "class", "processor processor should have a class key.") assertEquals( - "Alpha", - alpha.metadata["class"], - "Alpha processor should have a class key with value Alpha.") - assertEquals(Runner.Target.JVM, alpha.target, "Alpha processor should target JVM.") - assertEquals(2, alpha.parameters.size, "Alpha processor should have two parameters.") + "MyProcessor", + processor.metadata["class"], + "Processor should have a 'class' key with value 'MyProcessor'.") + assertEquals(Runner.Target.JVM, processor.target, "processor processor should target JVM.") + assertEquals(3, processor.parameters.size, "processor processor should have two parameters.") // Check its arguments. - val one = alpha.parameters["one"] - assertNotNull(one, "Parameter one should exist.") - assertEquals(IRParameter.Type.STRING, one.type, "Parameter one should be of type string.") - assertEquals(IRParameter.Count.SINGLE, one.count, "Parameter one should be a single value.") - assertEquals(IRParameter.Presence.REQUIRED, one.presence, "Parameter one should be required.") - - val two = alpha.parameters["two"] - assertNotNull(two, "Parameter two should exist.") - assertEquals(IRParameter.Type.INT, two.type, "Parameter two should be of type integer.") - assertEquals(IRParameter.Count.LIST, two.count, "Parameter two should be an array.") - assertEquals(IRParameter.Presence.OPTIONAL, two.presence, "Parameter two should optional.") + val arg1 = processor.parameters["arg1"] + assertNotNull(arg1, "Parameter arg1 should exist.") + assertEquals( + IRParameter.Type.STRING, arg1.getSimple(), "Parameter arg1 should be of type string.") + assertEquals(IRParameter.Count.SINGLE, arg1.count, "Parameter arg1 should be a single value.") + assertEquals(IRParameter.Presence.REQUIRED, arg1.presence, "Parameter arg1 should be required.") + + val arg2 = processor.parameters["arg2"] + assertNotNull(arg2, "Parameter arg2 should exist.") + assertEquals( + IRParameter.Type.INT, arg2.getSimple(), "Parameter arg2 should be of type integer.") + assertEquals(IRParameter.Count.LIST, arg2.count, "Parameter arg2 should be an array.") + assertEquals(IRParameter.Presence.OPTIONAL, arg2.presence, "Parameter arg2 should optional.") + + val arg3 = processor.parameters["arg3"] + assertNotNull(arg3, "Parameter arg3 should exist.") + assertEquals(2, arg3.getComplex().size, "Parameter arg3 should have two values.") + assertEquals(IRParameter.Count.SINGLE, arg3.count, "Parameter arg3 should be a single value.") + assertEquals(IRParameter.Presence.REQUIRED, arg3.presence, "Parameter arg3 should be required.") + + val arg4 = arg3["arg4"] + assertNotNull(arg4, "Parameter arg4 should exist.") + assertEquals( + IRParameter.Type.STRING, arg4.getSimple(), "Parameter arg4 should be of type string.") + assertEquals(IRParameter.Count.SINGLE, arg4.count, "Parameter arg4 should be a single value.") + assertEquals(IRParameter.Presence.REQUIRED, arg4.presence, "Parameter arg4 should be required.") + + val arg5 = arg3["arg5"] + assertNotNull(arg5, "Parameter arg5 should exist.") + assertEquals( + IRParameter.Type.INT, arg5.getSimple(), "Parameter arg5 should be of type integer.") + assertEquals(IRParameter.Count.SINGLE, arg5.count, "Parameter arg5 should be a single value.") + assertEquals(IRParameter.Presence.OPTIONAL, arg5.presence, "Parameter arg5 should be optional.") } @Test fun stages() { val stages = parser.stages() - // Get the first alpha stage. - val alphaOne = stages.find { it.uri.endsWith("alpha_one") } - assertNotNull(alphaOne, "alpha_one stage should exist.") - assertTrue( - alphaOne.processor.uri.endsWith("alpha"), "alpha_one stage should use the alpha processor.") + // Get the stage. + assertEquals(1, stages.size, "There should be one stage.") + val stage = stages[0] + assertTrue(stage.processor.uri.endsWith("processor"), "Stage should use the correct processor.") // Parse first argument. - val one = alphaOne.arguments["one"] - assertNotNull(one, "alpha_one::one should exist.") - assertEquals(1, one.value.size, "alpha_one::one should have one value.") - assertEquals("Hello, World!", one.value[0], "alpha_one::one should be 'Hello, World!'.") + val arg1 = stage.arguments["arg1"] + assertNotNull(arg1, "arg1 should exist.") + assertEquals(1, arg1.getSimple().size, "arg1 should have one value") + assertEquals("Hello, World!", arg1.getSimple()[0], "arg1 should be 'Hello, World!'.") // Parse second argument. - val two = alphaOne.arguments["two"] - assertNull(two, "alpha_one::two should not exist.") + val arg2 = stage.arguments["arg2"] + assertNotNull(arg2, "arg2 should exist.") + val arg2Values = arg2.getSimple().sorted() + assertEquals(3, arg2.getSimple().size, "arg2 should have three values") + assertEquals("1", arg2Values[0], "arg2 should be '1'.") + assertEquals("2", arg2Values[1], "arg2 should be '2'.") + assertEquals("3", arg2Values[2], "arg2 should be '3'.") + + // Parse third argument. + val arg3 = stage.arguments["arg3"] + assertNotNull(arg3, "arg3 should exist.") + assertEquals(1, arg3.getComplex().size, "arg3 should have one instance") + val arg = arg3.getComplex()[0] + + // Parse fourth argument. + val arg4 = arg["arg4"] + assertNotNull(arg4, "arg4 should exist.") + assertEquals(1, arg4.getSimple().size, "arg4 should have one value") + assertEquals("Hello, World!", arg4.getSimple()[0], "arg4 should be 'Hello, World!'.") + + // Parse fifth argument. + val arg5 = arg["arg5"] + assertNotNull(arg5, "arg5 should exist.") + assertEquals(1, arg5.getSimple().size, "arg5 should have one value") + assertEquals("1", arg5.getSimple()[0], "arg5 should be '1'.") } } diff --git a/src/test/kotlin/parser/impl/TomlParserTest.kt b/src/test/kotlin/parser/impl/TomlParserTest.kt deleted file mode 100644 index 946a9f8..0000000 --- a/src/test/kotlin/parser/impl/TomlParserTest.kt +++ /dev/null @@ -1,16 +0,0 @@ -package parser.impl - -import java.io.File -import parser.ParserTest -import technology.idlab.parser.impl.TomlParser - -class TomlParserTest : ParserTest() { - // Location of the TOML file. - private val url = this::class.java.getResource("/pipelines/basic.toml") - - // File object for the TOML file. - private val file = File(url!!.toURI()) - - // TOML parser. - override val parser = TomlParser(file) -} diff --git a/src/test/kotlin/processors/NodeTransparent.kt b/src/test/kotlin/processors/NodeTransparent.kt index 3bf6df6..882dde4 100644 --- a/src/test/kotlin/processors/NodeTransparent.kt +++ b/src/test/kotlin/processors/NodeTransparent.kt @@ -16,14 +16,14 @@ class NodeTransparent { "input" to IRParameter( IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, + presence = IRParameter.Presence.REQUIRED, + count = IRParameter.Count.SINGLE, ), "output" to IRParameter( IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, + presence = IRParameter.Presence.REQUIRED, + count = IRParameter.Count.SINGLE, ), ), mapOf("import" to "../std/transparent.js"), @@ -34,8 +34,8 @@ class NodeTransparent { "transparent_stage", processor, mapOf( - "input" to IRArgument(processor.parameters["input"]!!, listOf(channelInURI)), - "output" to IRArgument(processor.parameters["output"]!!, listOf(channelOutURI))), + "input" to IRArgument(listOf(channelInURI)), + "output" to IRArgument(listOf(channelOutURI))), ) } } diff --git a/src/test/kotlin/processors/TappedReader.kt b/src/test/kotlin/processors/TappedReader.kt index 85563bc..3c19bb2 100644 --- a/src/test/kotlin/processors/TappedReader.kt +++ b/src/test/kotlin/processors/TappedReader.kt @@ -38,8 +38,8 @@ class TappedReader(args: Map) : Processor(args) { "input" to IRParameter( IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, + presence = IRParameter.Presence.REQUIRED, + count = IRParameter.Count.SINGLE, ), ), mapOf("class" to "processors.TappedReader"), @@ -47,9 +47,7 @@ class TappedReader(args: Map) : Processor(args) { fun stage(channelURI: String): IRStage { return IRStage( - "tapped_reader_stage", - processor, - mapOf("input" to IRArgument(processor.parameters["input"]!!, listOf(channelURI)))) + "tapped_reader_stage", processor, mapOf("input" to IRArgument(listOf(channelURI)))) } } } diff --git a/src/test/kotlin/processors/TappedWriter.kt b/src/test/kotlin/processors/TappedWriter.kt index c99e0dc..4d02808 100644 --- a/src/test/kotlin/processors/TappedWriter.kt +++ b/src/test/kotlin/processors/TappedWriter.kt @@ -38,8 +38,8 @@ class TappedWriter(args: Map) : Processor(args) { "output" to IRParameter( IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, + presence = IRParameter.Presence.REQUIRED, + count = IRParameter.Count.SINGLE, ), ), mapOf("class" to "processors.TappedWriter"), @@ -47,9 +47,7 @@ class TappedWriter(args: Map) : Processor(args) { fun stage(channelURI: String): IRStage { return IRStage( - "tapped_writer_stage", - processor, - mapOf("output" to IRArgument(processor.parameters["output"]!!, listOf(channelURI)))) + "tapped_writer_stage", processor, mapOf("output" to IRArgument(listOf(channelURI)))) } } } diff --git a/src/test/kotlin/runner/RunnerTest.kt b/src/test/kotlin/runner/RunnerTest.kt index 2b3c137..626109d 100644 --- a/src/test/kotlin/runner/RunnerTest.kt +++ b/src/test/kotlin/runner/RunnerTest.kt @@ -55,16 +55,16 @@ abstract class RunnerTest { private val paramInput = IRParameter( - IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, + simple = IRParameter.Type.READER, + presence = IRParameter.Presence.REQUIRED, + count = IRParameter.Count.SINGLE, ) private val paramOutput = IRParameter( - IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, + simple = IRParameter.Type.WRITER, + presence = IRParameter.Presence.REQUIRED, + count = IRParameter.Count.SINGLE, ) private fun createProcessor(): IRProcessor { @@ -84,8 +84,8 @@ abstract class RunnerTest { "transparent_stage", this.createProcessor(), mapOf( - "input" to IRArgument(paramInput, listOf("channel_in_uri")), - "output" to IRArgument(paramOutput, listOf("channel_out_uri"))), + "input" to IRArgument(simple = listOf("channel_in_uri")), + "output" to IRArgument(simple = listOf("channel_out_uri"))), ) } } diff --git a/src/test/resources/pipelines/basic.toml b/src/test/resources/pipelines/basic.toml deleted file mode 100644 index 0656f4d..0000000 --- a/src/test/resources/pipelines/basic.toml +++ /dev/null @@ -1,58 +0,0 @@ -###################################################################### -# We define two processors, alpha and beta, each with two arguments. # -###################################################################### - -[processors.alpha] -target = "JVM" -class = "Alpha" - -[[processors.alpha.parameters]] -name = "one" -type = "string" -presence = "required" -count = "single" - -[[processors.alpha.parameters]] -name = "two" -type = "integer" -presence = "optional" -count = "list" - -[processors.beta] -target = "NodeJS - -[[processors.beta.parameters]] -name = "one" -type = "string" -presence = "required" -count = "single" - -[[processors.beta.parameters]] -name = "two" -type = "integer" -presence = "optional" -count = "list" - -########################################### -# Each processor has two concrete stages. # -########################################### - -[stages.alpha_one] -processor = "alpha" -arguments.one = "Hello, World!" -# arguments.two = null - -[stages.alpha_two] -processor = "alpha" -arguments.one = "Hello, World!" -arguments.two = [1, 2, 3] - -[stages.beta_one] -processor = "beta" -arguments.one = "Hello, World!" -# arguments.two = null - -[stages.beta_two] -processor = "beta" -arguments.one = "Hello, World!" -arguments.two = [1, 2, 3] diff --git a/src/test/resources/pipelines/basic.ttl b/src/test/resources/pipelines/basic.ttl index dd126f9..0546180 100644 --- a/src/test/resources/pipelines/basic.ttl +++ b/src/test/resources/pipelines/basic.ttl @@ -1,64 +1,87 @@ -@prefix conn: . +@prefix conn: . @prefix rdf: . @prefix sh: . @prefix xsd: . - a ; - "Hello, World!" . +############################ +### DEFINE THE PROCESSOR ### +############################ - a ; - "Hello, World!" ; - "1"^^xsd:int, - "2"^^xsd:int, - "3"^^xsd:int . + a conn:Processor ; + conn:target conn:Kotlin ; + conn:metadata "class:MyProcessor" . - a ; - "Hello, World!" . +######################## +### DEFINE A CHANNEL ### +######################## - a ; - "Hello, World!" ; - "1"^^xsd:int, - "2"^^xsd:int, - "3"^^xsd:int . + a conn:Channel . - a conn:Channel . +######################## +### DEFINE THE STAGE ### +######################## - a conn:Channel . - - a conn:Processor ; - conn:target "JVM" ; - conn:class "Alpha". - - a conn:Processor ; - conn:target "NodeJS" . - -[] a sh:NodeShape ; - sh:closed true ; - sh:ignoredProperties ( rdf:type ) ; - sh:property [ - sh:datatype xsd:string ; - sh:maxCount 1 ; - sh:minCount 1 ; - sh:path - ], [ - sh:datatype xsd:int ; - sh:minCount 0 ; - sh:path + a ; + conn:arguments [ + + "Hello, World!" ; + + "1"^^xsd:int , + "2"^^xsd:int , + "3"^^xsd:int ; + [ + "Hello, World!" ; + "1"^^xsd:int ] ; - sh:targetClass . + ]. + +########################################### +### DEFINE THE SHAPE OF THE PARAMETERS. ### +########################################### [] a sh:NodeShape ; - sh:closed true ; - sh:ignoredProperties ( rdf:type ) ; - sh:property [ - sh:datatype xsd:string ; - sh:maxCount 1 ; - sh:minCount 1 ; - sh:path - ], [ - sh:datatype xsd:int ; - sh:minCount 0 ; - sh:path + sh:closed true ; + sh:ignoredProperties ( rdf:type ) ; + sh:targetClass ; + sh:property [ + sh:maxCount 1 ; + sh:minCount 1 ; + sh:name "arguments" ; + sh:path conn:arguments ; + sh:node [ + sh:property [ + sh:maxCount 1 ; + sh:minCount 1 ; + sh:name "arg1" ; + sh:path ; + sh:datatype xsd:string ; + ], [ + sh:minCount 0 ; + sh:name "arg2" ; + sh:path ; + sh:datatype xsd:int ; + ], [ + sh:maxCount 1 ; + sh:minCount 1 ; + sh:name "arg3" ; + sh:path ; + sh:node [ + sh:closed true ; + sh:ignoredProperties ( rdf:type ) ; + sh:property [ + sh:datatype xsd:string ; + sh:maxCount 1 ; + sh:minCount 1 ; + sh:name "arg4" ; + sh:path ; + ], [ + sh:datatype xsd:int ; + sh:maxCount 1 ; + sh:minCount 0 ; + sh:name "arg5" ; + sh:path ; + ] ; + ] ; + ] ; ] ; - sh:targetClass . - + ].