diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b0d5185 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.dfx diff --git a/dfx.json b/dfx.json new file mode 100644 index 0000000..417394b --- /dev/null +++ b/dfx.json @@ -0,0 +1,31 @@ +{ + "canisters": { + + "pipelinify_testRunner": { + "main": "tests/_pipelinifyTest-Runner.mo", + "type": "motoko" + }, + "pipelinify_testConsumer": { + "main": "tests/_pipelinifyTest-Consumer.mo", + "type": "motoko" + }, + "pipelinify_testProcessor": { + "main": "tests/_pipelinifyTest-Processor.mo", + "type": "motoko" + } + }, + "defaults": { + "build": { + "args": "", + "packtool": "vessel sources" + } + }, + "dfx": "0.8.3", + "networks": { + "local": { + "bind": "127.0.0.1:8000", + "type": "ephemeral" + } + }, + "version": 1 +} \ No newline at end of file diff --git a/package-set.dhall b/package-set.dhall new file mode 100644 index 0000000..5fd55d7 --- /dev/null +++ b/package-set.dhall @@ -0,0 +1,26 @@ +let upstream = https://github.com/dfinity/vessel-package-set/releases/download/mo-0.6.4-20210624/package-set.dhall sha256:3f4cffd315d8ee5d2b4b5b00dc03b2e02732345b565340b7cb9cc0001444f525 +let Package = + { name : Text, version : Text, repo : Text, dependencies : List Text } + +let additions = + [ + { name = "candy" + , repo = "https://github.com/aramakme/candy_library.git" + , version = "v0.1.1" + , dependencies = ["base"] + }, + { name = "principal" + , repo = "https://github.com/aviate-labs/principal.mo.git" + , version = "v0.1.1" + , dependencies = ["base"] + }] : List Package + +let overrides = + [{ + name = "base", + repo = "https://github.com/dfinity/motoko-base", + version = "dfx-0.7.2", + dependencies = [] : List Text + }] : List Package + +in upstream # additions # overrides diff --git a/src/hex.mo b/src/hex.mo new file mode 100644 index 0000000..929b919 --- /dev/null +++ b/src/hex.mo @@ -0,0 +1,102 @@ +/** + * Module : Hex.mo + * Description : Hexadecimal encoding and decoding routines. + * Copyright : 2020 Enzo Haussecker + * License : Apache 2.0 with LLVM Exception + * Maintainer : Enzo Haussecker + * Stability : Stable + */ + +import Array "mo:base/Array"; +import Iter "mo:base/Iter"; +import Option "mo:base/Option"; +import Nat8 "mo:base/Nat8"; +import Char "mo:base/Char"; +import Result "mo:base/Result"; + +module { + + private type Result = Result.Result; + + private let base : Nat8 = 0x10; + + private let symbols = [ + '0', '1', '2', '3', '4', '5', '6', '7', + '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', + ]; + + /** + * Define a type to indicate that the decoder has failed. + */ + public type DecodeError = { + #msg : Text; + }; + + /** + * Encode an array of unsigned 8-bit integers in hexadecimal format. + */ + public func encode(array : [Nat8]) : Text { + Array.foldLeft(array, "", func (accum, w8) { + accum # encodeW8(w8); + }); + }; + + /** + * Encode an unsigned 8-bit integer in hexadecimal format. + */ + private func encodeW8(w8 : Nat8) : Text { + let c1 = symbols[Nat8.toNat(w8 / base)]; + let c2 = symbols[Nat8.toNat(w8 % base)]; + Char.toText(c1) # Char.toText(c2); + }; + + /** + * Decode an array of unsigned 8-bit integers in hexadecimal format. + */ + public func decode(text : Text) : Result<[Nat8], DecodeError> { + let next = text.chars().next; + func parse() : Result { + Option.get>( + do ? { + let c1 = next()!; + let c2 = next()!; + Result.chain(decodeW4(c1), func (x1) { + Result.chain(decodeW4(c2), func (x2) { + #ok (x1 * base + x2); + }) + }) + }, + #err (#msg "Not enough input!"), + ); + }; + var i = 0; + let n = text.size() / 2 + text.size() % 2; + let array = Array.init(n, 0); + while (i != n) { + switch (parse()) { + case (#ok w8) { + array[i] := w8; + i += 1; + }; + case (#err err) { + return #err err; + }; + }; + }; + #ok (Array.freeze(array)); + }; + + + /** + * Decode an unsigned 4-bit integer in hexadecimal format. + */ + private func decodeW4(char : Char) : Result { + for (i in Iter.range(0, 15)) { + if (symbols[i] == char) { + return #ok (Nat8.fromNat(i)); + }; + }; + let str = "Unexpected character: " # Char.toText(char); + #err (#msg str); + }; +}; diff --git a/src/lib.mo b/src/lib.mo new file mode 100644 index 0000000..8dcf832 --- /dev/null +++ b/src/lib.mo @@ -0,0 +1,1021 @@ +/////////////////////////////// +// +// ©2021 RIVVIR Tech LLC +// +//Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +//The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +/////////////////////////////// + + +import Iter "mo:base/Iter"; +import Array "mo:base/Array"; +import Nat8 "mo:base/Nat8"; +import Nat16 "mo:base/Nat16"; +import Nat32 "mo:base/Nat32"; +import Nat64 "mo:base/Nat64"; +import Nat "mo:base/Nat"; +import Result "mo:base/Result"; +import PipelinifyTypes "types"; +import Candy "mo:candy"; +import Debug "mo:base/Debug"; +import Buffer "mo:base/Buffer"; +import Hash "mo:base/Hash"; +import HashMap "mo:base/HashMap"; +import Time "mo:base/Time"; +import Int "mo:base/Int"; + +module { + + public class Pipelinify(_pipeline : PipelinifyTypes.PipelinifyIntitialization){ + type Result = Result.Result; + + type PipeInstanceID = Hash.Hash; + + var nonce : Nat = 0; + + + func selfHash(_self : Hash.Hash) : Hash.Hash { + _self; + }; + + let workspaceCache = HashMap.HashMap( + 16, + Hash.equal, + selfHash + ); + + let requestCache = HashMap.HashMap( + 16, + Hash.equal, + selfHash + ); + + let processCache = HashMap.HashMap( + 16, + Hash.equal, + selfHash + ); + + func getPipeInstanceID(request: PipelinifyTypes.ProcessRequest) : PipeInstanceID { + nonce += 1; + //todo enable time or some randomish var for the nonce + //Hash.hash(Int.abs(Time.now()) + nonce); + + let thisHash = Hash.hash(nonce); + requestCache.put(thisHash, { + request = { + event = request.event; + dataConfig = switch(request.dataConfig){ + case(#dataIncluded(data)){#internal;}; + case(#local(data)){#local(data);}; + case(#pull(pullRequest)){ + #pull{ + sourceActor= pullRequest.sourceActor; + sourceIdentifier = pullRequest.sourceIdentifier; + mode = pullRequest.mode; + totalChunks = pullRequest.totalChunks; + data = ?[]; + }; + }; + case(#push){#push}; + case(#internal){#internal}; + }; + processConfig = request.processConfig; + executionConfig = request.executionConfig; + responseConfig = request.responseConfig;}; + timestamp = Int.abs(Time.now()); + status = #initialized; + }); + + thisHash + }; + + + let onDataWillBeLoaded : (PipeInstanceID, ?PipelinifyTypes.ProcessRequest) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onDataWillBeLoaded){ + case(null){ + func _onDataReady(_hash : PipeInstanceID, _processRequest : ?PipelinifyTypes.ProcessRequest) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + } + }; + case(?_onDataWillBeLoaded){ + _onDataWillBeLoaded; + } + }; + + let onDataReady : (PipeInstanceID, PipelinifyTypes.Workspace, ?PipelinifyTypes.ProcessRequest) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onDataReady){ + case(null){ + func _onDataReady(_hash : PipeInstanceID, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + } + }; + case(?_onDataReady){ + _onDataReady; + } + }; + + let onPreProcess : (PipeInstanceID, PipelinifyTypes.Workspace, ?PipelinifyTypes.ProcessRequest, ?Nat) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onPreProcess){ + case(null){ + func _onPreProcess(_hash : PipeInstanceID, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest, _step : ?Nat) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + + } + }; + case(?_onPreProcess){ + _onPreProcess; + } + }; + + let onProcess : (PipeInstanceID, PipelinifyTypes.Workspace, ?PipelinifyTypes.ProcessRequest, ?Nat) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onProcess){ + case(null){ + func _onProcess(_hash : PipeInstanceID, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest, _step : ?Nat) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + + } + }; + case(?_onProcess){ + _onProcess; + } + }; + + let onPostProcess : (PipeInstanceID, PipelinifyTypes.Workspace, ?PipelinifyTypes.ProcessRequest, ?Nat) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onPostProcess){ + case(null){ + func _onPostProcess(_hash : PipeInstanceID, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest, _step: ?Nat) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + + } + }; + case(?_onPostProcess){ + _onPostProcess; + } + }; + + let onDataWillBeReturned : (PipeInstanceID, PipelinifyTypes.Workspace, ?PipelinifyTypes.ProcessRequest) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onDataWillBeReturned){ + case(null){ + func _onDataWillBeReturned(_hash : PipeInstanceID, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + + } + }; + case(?_onDataWillBeReturned){ + _onDataWillBeReturned; + } + }; + + let onDataReturned : (PipeInstanceID, ?PipelinifyTypes.ProcessRequest, ?PipelinifyTypes.ProcessResponse) -> PipelinifyTypes.PipelineEventResponse = switch (_pipeline.onDataReturned){ + case(null){ + func _onDataReturned(_hash : PipeInstanceID, _processRequest : ?PipelinifyTypes.ProcessRequest, _processResponse : ?PipelinifyTypes.ProcessResponse) : PipelinifyTypes.PipelineEventResponse{ + //just returns what it was given + return #dataNoOp; + + } + }; + case(?_onDataReturned){ + _onDataReturned; + } + }; + + let getProcessType : (PipeInstanceID, PipelinifyTypes.Workspace, ?PipelinifyTypes.ProcessRequest) -> PipelinifyTypes.ProcessType = switch (_pipeline.getProcessType){ + case(null){ + func _getProcessType(_hash : PipeInstanceID, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest) : PipelinifyTypes.ProcessType{ + //just returns what it was given + return #unconfigured; + + } + }; + case(?_getProcessType){ + _getProcessType; + } + }; + + let getLocalWorkspace : (PipeInstanceID, Nat, ?PipelinifyTypes.ProcessRequest) -> Candy.Workspace = switch (_pipeline.getLocalWorkspace){ + case(null){ + func _getLocalWorkspace(_hash : PipeInstanceID, _id : Nat, _request: ?PipelinifyTypes.ProcessRequest) : Candy.Workspace{ + //just returns what it was given + return Candy.emptyWorkspace(); + + } + }; + case(?_getLocalWorkspace){ + _getLocalWorkspace; + } + }; + + let putLocalWorkspace : (PipeInstanceID, Nat, Candy.Workspace, ?PipelinifyTypes.ProcessRequest) -> Candy.Workspace = switch (_pipeline.putLocalWorkspace){ + case(null){ + func _putLocalWorkspacee(_hash : PipeInstanceID, _id : Nat, _workspace: Candy.Workspace, _request: ?PipelinifyTypes.ProcessRequest) : Candy.Workspace{ + //just returns what it was given + return Candy.emptyWorkspace(); + + } + }; + case(?_putLocalWorkspace){ + _putLocalWorkspace; + } + }; + + + func handleProcessing(pipeInstanceID : Hash.Hash, data: PipelinifyTypes.Workspace, request : ?PipelinifyTypes.ProcessRequest, step: ?Nat) : Result<{data :PipelinifyTypes.Workspace; bFinished: Bool},PipelinifyTypes.ProcessError> { + //process the data + var _request : ?PipelinifyTypes.ProcessRequest = request; + + if(_request == null){ + let currentCache = requestCache.get(pipeInstanceID); + _request := do?{currentCache!.request}; + }; + switch(_request){ + case(null){ + //todo: return error + }; + case(?_request){ + switch(_request.executionConfig.executionMode){ + case(#onLoad){ + Debug.print("handling onLoad - Preprocess"); + let preProcessResponse = onPreProcess(pipeInstanceID, data, ?_request, step); + //todo: handle response from preprocess + Debug.print("handling onLoad - process"); + let response : PipelinifyTypes.PipelineEventResponse = onProcess(pipeInstanceID, data, ?_request, step); + //todo: handle response + switch(response){ + case(#dataUpdated){ + Debug.print("setting processedData"); + //processedData := data.newData; + }; + case(#stepNeeded){ + Debug.print("more processing needed a"); + return #ok{data=data;bFinished=false}; + }; + case(_){ + Debug.print("Not implemnted an error occured" # debug_show(response)) + }; + }; + Debug.print("handling onLoad - postProcess"); + let postProcessResponse = onPostProcess(pipeInstanceID, data, ?_request, step); + //todo: handle response + return #ok({data=data; bFinished=true}); + }; + case(#manual){ + Debug.print("handling manual - Preprocess"); + let preProcessResponse = onPreProcess(pipeInstanceID, data, ?_request, step); + //todo: handle response from preprocess + Debug.print("handling manual - process"); + let response : PipelinifyTypes.PipelineEventResponse = onProcess(pipeInstanceID, data, ?_request, step); + //todo: handle response + switch(response){ + case(#dataUpdated){ + Debug.print("setting processedData"); + //processedData := data.newData; + //todo...we are returing false so that the client can manually finalize + //todo probsbly need to calll on post prcess manually in the finalize function + return #ok{data=data;bFinished=false}; + }; + case(#stepNeeded){ + Debug.print("more processing needed b"); + return #ok{data=data;bFinished=false}; + }; + case(_){ + Debug.print("Not implemnted an error occured" # debug_show(response)) + }; + }; + Debug.print("handling manual - postProcess"); + let postProcessResponse = onPostProcess(pipeInstanceID, data, ?_request, step); + //todo: handle response + return #ok({data=data; bFinished=true;}); + }; + + + }; + }; + }; + return #err({code=77777; text="Not Implemented Execution Pathway"}); + }; + + func handleReturn(pipeInstanceID : Hash.Hash, finalData: PipelinifyTypes.Workspace, request : ?PipelinifyTypes.ProcessRequest) : Result { + //process the data + var _request : ?PipelinifyTypes.ProcessRequest = request; + + if(_request == null){ + let currentCache = requestCache.get(pipeInstanceID); + _request := do?{currentCache!.request}; + }; + switch(_request){ + case(null){ + //todo: return error + }; + case(?_request){ + switch(_request.responseConfig.responseMode){ + case(#push){ + Debug.print("returning the data"); + let dataWillBeReturnedResponse = onDataWillBeReturned(pipeInstanceID, finalData, ?_request); + let processResponse = + #dataIncluded{ + payload = Candy.workspaceToAddressedChunkArray(finalData); + }; + let dataReturnedResponse : PipelinifyTypes.PipelineEventResponse = onDataReturned(pipeInstanceID, ?_request, ?processResponse); + + //todo: dilema: how do we return before we call onData Return! + return #ok(processResponse); + }; + case(#pull){ + Debug.print("waiting to return data"); + //let dataWillBeReturnedResponse = onDataWillBeReturned(pipeInstanceID, processedData, ?_request); + //responseCache.put(pipeInstanceID, {data = processedData} ); + let processResponse = + #outtakeNeeded{ + pipeInstanceID = pipeInstanceID; + }; + //let dataReturnedResponse : PipelinifyTypes.PipelineEventResponse = onDataReturned(pipeInstanceID, ?_request, ?processResponse); + + //todo: dilema: how do we return before we call onData Return! + return #ok(processResponse); + }; + + case(#local(id)){ + Debug.print("returning data locally"); + let dataWillBeReturnedResponse = putLocalWorkspace(pipeInstanceID, id, finalData, ?_request); + //responseCache.put(pipeInstanceID, {data = processedData} ); + let processResponse = + #local(id); + //let dataReturnedResponse : PipelinifyTypes.PipelineEventResponse = onDataReturned(pipeInstanceID, ?_request, ?processResponse); + + //todo: dilema: how do we return before we call onData Return! + return #ok(processResponse); + }; + //case(_){ + // Debug.print("not handled the returning of data") + //}; + + }; + }; + }; + return #err({code=77777; text="Not Implemented Execution Pathway"}); + }; + + public func process(_request : PipelinifyTypes.ProcessRequest) : async Result { + //file the request and alert to new data + //assign a hash + let pipeInstanceID = getPipeInstanceID(_request); + + Debug.print("the process is on"); + var thisWorkspace = Candy.emptyWorkspace(); + + + + //check the data + switch(_request.dataConfig){ + case(#local(_id)){ + let dataWillLoadResponse = onDataWillBeLoaded(pipeInstanceID, ?_request); + //todo: chunk data into workspace + + thisWorkspace := getLocalWorkspace(pipeInstanceID, _id, ?_request); + workspaceCache.put(pipeInstanceID, { + var status = #initialized; + data = thisWorkspace; + }); + Debug.print("workspace included" # debug_show(thisWorkspace.size())); + let dataResponse : PipelinifyTypes.PipelineEventResponse = onDataReady(pipeInstanceID, thisWorkspace, ?_request); + }; + case(#dataIncluded(dataIncludedRequest)){ + //Debug.print("data included" # debug_show(dataIncludedRequest.data)); + //finalData := dataIncludedRequest.data; + let dataWillLoadResponse = onDataWillBeLoaded(pipeInstanceID, ?_request); + //todo: chunk data into workspace + + thisWorkspace := Candy.fromAddressedChunks(dataIncludedRequest.data); + Debug.print("workspace included" # debug_show(thisWorkspace.size())); + let dataResponse : PipelinifyTypes.PipelineEventResponse = onDataReady(pipeInstanceID, thisWorkspace, ?_request); + }; + case(#pull(pullRequest)){ + Debug.print("pump needed "); + var bInitilized : Bool = false; + var bLoading : Bool = true; + var chunkCount : Nat32 = 0; + var dataChunks : [PipelinifyTypes.AddressedChunk] = []; + switch(pullRequest.data){ + case(null){ + //todo: this may be able to be moved + Debug.print("initilizing cache with no data"); + workspaceCache.put(pipeInstanceID, { + var status = #initialized; + data = Candy.emptyWorkspace(); + }); + bInitilized := true; + }; + case(?data){ + Debug.print("initilizing cache with data"); + bInitilized := true; + bLoading := true; + Candy.fileAddressedChunks(thisWorkspace, data); + chunkCount := 1; + }; + }; + + var thisChunk : Nat = Nat32.toNat(chunkCount); + var totalChunks : Nat = 99999; + switch (pullRequest.totalChunks){ + case(?foundChunks){ + totalChunks := Nat32.toNat(foundChunks); + }; + case(_){}; + }; + + + switch(pullRequest.mode, pullRequest.sourceActor){ + case(#pull, null){ + return #err({ + text = "sourcePrincipal is require for pull"; + code = 1;}); + }; + case(#pullQuery, null){ + return #err({ + text = "sourcePrincipal is require for pullQuery"; + code = 1;}); + }; + case(_, ?sourceActor){ + //try to pull in the data + //todo: maybe we try query and catch and fall back? + + Debug.print("Polling sourceActor for chunks " # debug_show(chunkCount)); + + label dataRetrieve while(thisChunk < totalChunks){ + Debug.print("processing chunk" # debug_show(thisChunk)); + let chunkResult = switch(pullRequest.mode){ + case(#pull){ + await sourceActor.requestPipelinifyChunk({ + chunkID = thisChunk; + event = _request.event; + sourceIdentifier = pullRequest.sourceIdentifier; + }); + }; + case(#pullQuery){ + await sourceActor.queryPipelinifyChunk({ + chunkID = thisChunk; + event = _request.event; + sourceIdentifier = pullRequest.sourceIdentifier; + }); + }; + + }; + + + switch(chunkResult){ + case(#ok(response)){ + switch response{ + case(#chunk(chunkData)){ + Debug.print("found some Data"); + //dataChunks := Array.append(dataChunks, Array.make(chunkData)); + Candy.fileAddressedChunks(thisWorkspace, chunkData); + chunkCount := chunkCount + 1; + }; + case(#eof(chunkData)){ + Debug.print("found some eof data"); + Candy.fileAddressedChunks(thisWorkspace, chunkData); + chunkCount := chunkCount + 1; + break dataRetrieve; + }; + case(#parallel(chunkData)){ + Debug.print("found some parallel data"); + Candy.fileAddressedChunks(thisWorkspace, chunkData.2); + chunkCount := chunkCount + 1; + break dataRetrieve; + }; + case(#err(error)){ + Debug.print("Found Error when requesting Chunk"); + return #err{text = error.text; code = error.code;}; + }; + }; + }; + case(#err(error)){ + Debug.print("Found Error when requesting Chunk"); + return #err{text = error.text; code = error.code;}; + }; + }; + thisChunk += 1; + }; + Debug.print("have final data chunks" ); + //todo: how to return + //finalData := Array.flatten(dataChunks); + Debug.print("have final data "); + }; + + + + }; + }; + case(#push){ + //we are basically done here. We need the user to push us data so we can continue. We do need to send the cache key. + Debug.print("this is a push operation"); + //todo: maybe we require initialization + workspaceCache.put(pipeInstanceID, { + var status = #initialized; + data = Candy.emptyWorkspace(); + }); + return #ok(#intakeNeeded{ + pipeInstanceID = pipeInstanceID; + currentChunks = 0; + totalChunks = 0; + chunkMap = []; + }); + }; + case(#internal){ + return #err{text = "Internal should not be used for initial request."; code = 178;}; + } + }; + + + + //process the data + //todo: make sure we skip this for single steps that aren't a push procedure + let processingResult = handleProcessing(pipeInstanceID, thisWorkspace, ?_request, null); + switch(processingResult){ + case(#ok(data)){ + Debug.print("push branch processing results"); + if(handleParallelProcessStepResult(pipeInstanceID,null, data) == true){ + //more processing needed + + return #ok(#stepProcess{ + pipeInstanceID = pipeInstanceID; + status = getProcessType(pipeInstanceID, thisWorkspace,?_request); + }); + }; + }; + + + case(#err(theError)){ + + return #err(theError); + } + }; + + + //return the data + + return handleReturn(pipeInstanceID, thisWorkspace, ?_request); + + }; + + func handleParallelProcessStepResult(pipeInstanceID : Hash.Hash, step: ?Nat, data : {data: PipelinifyTypes.Workspace; bFinished: Bool}) : Bool{ + Debug.print("in parallel proces handling" # debug_show(data.bFinished)); + + //more processing needed + Debug.print("looking for cache"); + var cache = processCache.get(pipeInstanceID); + //switch(cache){ + // case(null){ + // cache := initilizeParallelProcessCache(); + // }; + //}; + switch(cache, step){ + + case(?cache, ?step){ + Debug.print("manipulating cache"); + cache.map[step] := true; + cache.status := #pending(step); + return true; + }; + case(?cache, null){ + //not implemented - a paralle process with unknown steps - shouldn't be here + Debug.print("Hit not implemented parallel with unknown steps"); + + return false; + }; + case(_,_){//initialize? I think we assume this is initialized elsewhere + Debug.print("No Cache and No Step...check finished" # debug_show(data.bFinished)); + if(data.bFinished == false){ + + return true; + }; + return false; + }; + }; + return false; + + }; + + public func singleStep(_request : PipelinifyTypes.StepRequest) : async Result { + //process the data + let thisCache = workspaceCache.get(_request.pipeInstanceID); + let thisRequestCache = requestCache.get(_request.pipeInstanceID); + + + + Debug.print("In single step pipelinify " # debug_show(_request)); + switch(thisCache, thisRequestCache){ + + case(?thisCache, ?thisRequestCache){ + //let thisProcessStatus = getProcessType(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + + Debug.print("processing final chunks " # debug_show(thisCache.data.size())); + //var finalData = Array.flatten(thisCache.data); + //Debug.print("have final data single step " # debug_show(finalData.size())); + let processingResult = handleProcessing(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request, _request.step); + switch(processingResult){ + case(#ok(data)){ + Debug.print("handling single step process"); + //processedData := data; + let notFinished : Bool = handleParallelProcessStepResult(_request.pipeInstanceID, _request.step, data); + //more processing needed + let status = getProcessType(_request.pipeInstanceID, thisCache.data,?thisRequestCache.request); + switch(status){ + //let user finalize for parallel + case(#parallel(info)){ + Debug.print("inside of the more steps handler"); + return #ok(#stepProcess{ + pipeInstanceID = _request.pipeInstanceID; + status = status; + }); + }; + case(#sequential(info)){ + if(data.bFinished == true){ + //do nothing + } else { + return #ok(#stepProcess{ + pipeInstanceID = _request.pipeInstanceID; + status = status; + }); + }; + }; + case(_){ + return #err({code=948484; text="not configured"}); + } + } + }; + case(#err(theError)){ + + return #err(theError); + } + }; + + + //return the data + Debug.print("shouldn only be here for sequential - return after single step"); + + return handleReturn(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + }; + case (_, _){ + return #err({text="Cannot find intake Cache for ID" # Nat32.toText(_request.pipeInstanceID); code= 8;}); + }; + } + + + }; + + public func getPushStatus(_request : PipelinifyTypes.PushStatusRequest) : Result { + let thisCache = workspaceCache.get(_request.pipeInstanceID); + let thisRequestCache = requestCache.get(_request.pipeInstanceID); + switch(thisCache, thisRequestCache){ + + case(?thisCache, ?thisRequestCache){ + switch(thisCache.status){ + case(#initialized){ + return #ok(#intakeNeeded{ + pipeInstanceID = _request.pipeInstanceID; + currentChunks = 0; + totalChunks = 0; + chunkMap = []; + }); + }; + case(#doneLoading){ + return #ok(#stepProcess({ + pipeInstanceID = _request.pipeInstanceID; + status = getProcessType(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + }))}; + case(#loading(chunkData)){ + return #ok(#intakeNeeded{ + pipeInstanceID = _request.pipeInstanceID; + currentChunks = chunkData.0; + totalChunks = chunkData.1; + chunkMap = chunkData.2; + }); + }; + case(_){ + return #err({code=15; text="done loading"}); + } + } + }; + case (_,_){ + return #err({text="Cannot find intake Cache for ID" # Nat32.toText(_request.pipeInstanceID); code= 8;}); + }; + }; + }; + + public func pushChunk(_request : PipelinifyTypes.ChunkPush) : async Result { + + // pull the intake cache + let thisCache = workspaceCache.get(_request.pipeInstanceID); + switch(thisCache){ + case (null){ + return #err({text="Cannot find intake Cache for ID" # Nat32.toText(_request.pipeInstanceID); code= 8;}); + }; + case(?thisCache){ + //prepare the new cache entry + switch(thisCache.status){ + + case(#doneLoading){return #err({code=15; text="done loading";})}; + case(#processing(val)){return #err({code=15; text="done loading";})}; + case(#doneProcessing){return #err({code=15; text="done loading";})}; + case(#returning(val)){return #err({code=15; text="done loading";})}; + case(#done){return #err({code=15; text="done loading";})}; + case(_){ + //keep going + Debug.print("keep going"); + }; + + }; + switch(_request.chunk){ + case(#chunk(chunkData)){ + + Debug.print("chunk pushed"); + //Debug.print("done"); + + switch(thisCache.status){ + case(#initialized){ + thisCache.status := #loading(1, 1, [true]); + }; + case(#loading(loadingVal)){ + thisCache.status := #loading(loadingVal.0 + 1,loadingVal.1 + 1,Array.freeze(Array.init(loadingVal.0 + 1, true))); + }; + + case(_){ + //todo: need to handle errors + //throw #err({text="Cannot add to intake cached for a 'done' intake cache"; code = 9;}); + return #err({code=15; text="done loading"}); + //Debug.print("done"); + //#done; + }; + }; + + //Debug.print("chunk data " # debug_show(chunkData[1]) # debug_show(chunkData[2]) # debug_show(chunkData[3]) # debug_show(chunkData[4]) # debug_show(chunkData[5])); + Candy.fileAddressedChunks(thisCache.data, chunkData); + }; + + + + + + + case(#eof(chunkData)){ + Debug.print("hit EOF Chunk"); + thisCache.status := #doneLoading; + //Debug.print("eof data " # debug_show(chunkData[1]) # debug_show(chunkData[2]) # debug_show(chunkData[3]) # debug_show(chunkData[4]) # debug_show(chunkData[5]) ); + Candy.fileAddressedChunks(thisCache.data, chunkData); + + }; + case(#parallel(chunkData)){ + switch(thisCache.status){ + case(#initialized){ + Debug.print("in parallel initialized"); + let thisMap = Array.init(chunkData.1,false); + thisMap[chunkData.0] := true; + thisCache.status := #loading(chunkData.0, chunkData.1, Array.freeze(thisMap)); + Candy.fileAddressedChunks(thisCache.data, chunkData.2); + return #ok(#intakeNeeded{ + pipeInstanceID = _request.pipeInstanceID; + currentChunks = chunkData.0; + totalChunks = chunkData.1; + chunkMap = Array.freeze(thisMap); + }); + + }; + case(#loading(loadingVal)){ + Debug.print("in prallel loading"); + let thisMap = Array.thaw(loadingVal.2); + thisMap[chunkData.0] := true; + thisCache.status := #loading(chunkData.0,chunkData.1, Array.freeze(thisMap)); + Candy.fileAddressedChunks(thisCache.data, chunkData.2); + Debug.print("map is " # debug_show(thisMap)); + return #ok(#intakeNeeded{ + pipeInstanceID = _request.pipeInstanceID; + currentChunks = chunkData.0; + totalChunks = chunkData.1; + chunkMap = Array.freeze(thisMap); + }); + }; + case(_){ + //todo: need to handle errors + //throw #err({text="Cannot add to intake cached for a 'done' intake cache"; code = 9;}); + Debug.print("error...already loaded"); + return #err({code = 14; text="pipe already pushed"}); + }; + }; + + + + + + }; + case(#err(theErr)){ + //todo...this should probably be an error 8..not sure how to return it. + return #err({code = 16; text="do not send an error chunk"}); + + }; + }; + + + //load the data into the cache + //todo: maybe only if not done? + //Debug.print("Putting the intake cache."); + //intakeCache.put(_request.pipeInstanceID, newCache); + + + switch(thisCache.status){ + case(#initialized){ + return #err({text="Unloaded Intake Cache. Should Not Be Here."; code = 11;}) + }; + case(#loading(loadingValue)){ + //we are done and will hang out until the next cache push + Debug.print("Done...hanging out for more push."); + return #ok(#intakeNeeded{ + pipeInstanceID = _request.pipeInstanceID; + currentChunks = loadingValue.0; + totalChunks = loadingValue.1; + chunkMap = loadingValue.2; + }); + }; + case(#doneLoading){ + //we need to look at the execution config + Debug.print("we uploaded a chunk and now we are done"); + let thisRequestCache = requestCache.get(_request.pipeInstanceID); + switch(thisRequestCache){ + case(null){ + return #err({text="Request Cache is missing."; code = 12;}); + }; + case(?thisRequestCache){ + switch(thisRequestCache.request.executionConfig.executionMode){ + case(#onLoad){ + //we are going to run the process now + + Debug.print("have final data chunks"); + //var finalData = Array.flatten(newCache.data); + //Debug.print("have final data " # debug_show(finalData.size())); + + let processingResult = handleProcessing(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request, null); + + Debug.print("got the results "); + switch(processingResult){ + case(#ok(data)){ + Debug.print("got back from the handle process"); + if(handleParallelProcessStepResult(_request.pipeInstanceID, null, data) == true){ + //more processing needed + + return #ok(#stepProcess{ + pipeInstanceID = _request.pipeInstanceID; + status = getProcessType(_request.pipeInstanceID, thisCache.data,?thisRequestCache.request); + }); + }; + //processedData := data; + }; + case(#err(theError)){ + + Debug.print("I don't like that im here" ); + return #err(theError); + } + }; + + return handleReturn(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + + }; + case(#manual){ + return #ok(#stepProcess{ + pipeInstanceID = _request.pipeInstanceID; + status = getProcessType(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + + }); + }; + }; + }; + }; + }; + case(_){ + Debug.print("didnt handle status"); + } + }; + }; + + }; + + + //determine EOF + + //call process if necessary + return #err({text="Not Implemented"; code = 984585;}); + }; + + public func initilizeParallelProcessCache(_pipeInstanceID : PipelinifyTypes.PipeInstanceID, _steps: Nat) : PipelinifyTypes.ProcessCache { + let thisCache : PipelinifyTypes.ProcessCache = { + steps = _steps; + map = Array.init(_steps, false); + var status = #initialized; + }; + processCache.put(_pipeInstanceID, thisCache); + return thisCache; + }; + + public func getProcessCache(_pipeInstanceID : PipelinifyTypes.PipeInstanceID) : ?PipelinifyTypes.ProcessCache { + + let thisCache = processCache.get(_pipeInstanceID); + + return thisCache; + }; + + public func getProcessingStatus(_request : PipelinifyTypes.ProcessingStatusRequest) : Result { + let thisCache = workspaceCache.get(_request.pipeInstanceID); + let thisRequestCache = requestCache.get(_request.pipeInstanceID); + switch(thisCache, thisRequestCache){ + + case(?thisCache, ?thisRequestCache){ + switch(thisCache.status){ + case(#initialized){ + return #err({code=21; text="processing not ready"}); + }; + case(#loading(someData)){ + if(someData.2.size() > 0){ + let total = someData.2.size(); + var tracker = 0; + for(thisItem in someData.2.vals()){ + if(thisItem == true){ + tracker += 1; + }; + }; + if(tracker < total){ + return #err({code=21; text="processing not ready"}); + } else { + return #ok(#stepProcess({ + pipeInstanceID = _request.pipeInstanceID; + status = getProcessType(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + })); + }; + }; + + return #err({code=21; text="processing not ready"}); + }; + + case(#doneLoading){ + return #ok(#stepProcess({ + pipeInstanceID = _request.pipeInstanceID; + status = getProcessType(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + }))}; + case(#processing(chunkData)){ + return #ok(#stepProcess({ + pipeInstanceID = _request.pipeInstanceID; + status = getProcessType(_request.pipeInstanceID, thisCache.data, ?thisRequestCache.request); + }))}; + + case(#doneProcessing){ + return #ok(#outtakeNeeded({ + pipeInstanceID = _request.pipeInstanceID; + //todo: add outtake status + }))}; + + case(_){ + return #err({code=22; text="done loading"}); + } + } + }; + case (_,_){ + return #err({text="Cannot find intake Cache for ID" # Nat32.toText(_request.pipeInstanceID); code= 8;}); + }; + }; + }; + + public func getChunk(_request : PipelinifyTypes.ChunkGet) : Result { + //return #err({code=99999999;text="not imlemented"}); + + // pull the intake cache + Debug.print("made it to get chunk"); + let thisCache = workspaceCache.get(_request.pipeInstanceID); + + switch(thisCache){ + case (null){ + return #err({text="Cannot find response Cache for ID" # Nat32.toText(_request.pipeInstanceID); code= 13;}); + }; + case(?thisCache){ + let result = Candy.getWorkspaceChunk(thisCache.data, _request.chunkID, _request.chunkSize); + switch(result.0){ + case(#eof){ + return #ok(#eof(result.1.toArray())); + }; + case(#chunk){ + return #ok(#chunk(result.1.toArray())); + }; + }; + }; + }; + //call process if necessary + return #err({text="Not Implemented"; code = 984585;}); + + }; + + }; + + + +}; diff --git a/src/types.mo b/src/types.mo new file mode 100644 index 0000000..c32b784 --- /dev/null +++ b/src/types.mo @@ -0,0 +1,278 @@ +/////////////////////////////// +// +// ©2021 RIVVIR Tech LLC +// +//Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +//The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +/////////////////////////////// + +import Array "mo:base/Array"; +import Buffer "mo:base/Buffer"; +import Debug "mo:base/Debug"; +import Hash "mo:base/Hash"; +import Iter "mo:base/Iter"; +import Nat "mo:base/Nat"; +import Nat16 "mo:base/Nat16"; +import Nat32 "mo:base/Nat32"; +import Nat8 "mo:base/Nat8"; +import Principal "mo:base/Principal"; +import Result "mo:base/Result"; +import String "mo:base/Text"; +import Text "mo:base/Text"; +import Candy "mo:candy"; + + +module { + + type Hash = Hash.Hash; + type Result = Result.Result; + + public type PipeInstanceID = Hash.Hash; + + public type AddressedChunk = Candy.AddressedChunk; + public type Workspace = Candy.Workspace; + + public type DataChunk = Buffer.Buffer; + public type DataZone = Buffer.Buffer; + + + + + //pipeline types + public type PipelinifyIntitialization = { + onDataWillBeLoaded: ?((Hash, ?ProcessRequest) -> PipelineEventResponse); + onDataReady: ?((Hash, Workspace, ?ProcessRequest) -> PipelineEventResponse); + onPreProcess: ?((Hash, Workspace, ?ProcessRequest, ?Nat) -> PipelineEventResponse); + onProcess: ?((Hash, Workspace, ?ProcessRequest, ?Nat) -> PipelineEventResponse); + onPostProcess: ?((Hash, Workspace, ?ProcessRequest, ?Nat) -> PipelineEventResponse); + onDataWillBeReturned: ?((Hash, Workspace,?ProcessRequest) -> PipelineEventResponse); + onDataReturned: ?((Hash, ?ProcessRequest, ?ProcessResponse) -> PipelineEventResponse); + getProcessType: ?((Hash, Workspace, ?ProcessRequest) -> ProcessType); + getLocalWorkspace: ?((Hash, Nat, ?ProcessRequest) -> Candy.Workspace); + putLocalWorkspace: ?((Hash, Nat, Candy.Workspace, ?ProcessRequest) -> Candy.Workspace); + + }; + + public type ChunkResponse = { + #chunk: [AddressedChunk]; + #eof: [AddressedChunk]; + #parallel: (Nat, Nat,[AddressedChunk]); + #err: ProcessError; + }; + + public type ChunkPush = { + pipeInstanceID: PipeInstanceID; + chunk: ChunkResponse; + }; + + public type StepRequest = { + pipeInstanceID: PipeInstanceID; + step: ?Nat; + }; + + public type DataConfig = { + #dataIncluded : { + data: [AddressedChunk]; //data if small enough to fit in the message + }; + #local : Nat; + + #pull : { + sourceActor: ?DataSource; + sourceIdentifier: ?Hash.Hash; + mode : { #pull; #pullQuery;}; + totalChunks: ?Nat32; + data: ?[AddressedChunk]; + }; + #push; + #internal; + }; + + + public type ProcessRequest = { + event: ?Text; + dataConfig: DataConfig; + processConfig: ?Candy.CandyValue; + executionConfig: ExecutionConfig; + responseConfig: ResponseConfig; + }; + + public type RequestCache = { + request : ProcessRequest; + timestamp : Nat; + status: { + #initialized; + #dataDone; + #responseReady; + #finalized; + }; + }; + + public type ProcessCache = { + map : [var Bool]; + steps : Nat; + var status: { + #initialized; + #done; + #pending: Nat; + }; + }; + + public type ExecutionConfig = { + executionMode: { + #onLoad; + #manual; + }; + }; + + public type ResponseConfig = { + responseMode: { + #push; + #pull; + #local : Nat; + }; + }; + + public type ProcessError = { + text: Text; + code: Nat; + }; + + + public type ProcessResponse = { + #dataIncluded: { + payload: [AddressedChunk]; + }; + #local : Nat; + #intakeNeeded: { + pipeInstanceID: PipeInstanceID; + currentChunks: Nat; + totalChunks: Nat; + chunkMap: [Bool]; + }; + #outtakeNeeded: { + pipeInstanceID: PipeInstanceID; + }; + #stepProcess: { + pipeInstanceID: PipeInstanceID; + status: ProcessType; + }; + + }; + + public type DataReadyResponse = { + #dataIncluded: { + payload: [AddressedChunk]; + }; + #error: { + text: Text; + code: Nat; + }; + }; + + public type PipelineEventResponse = { + #dataNoOp; + #dataUpdated; + #stepNeeded; + #error : ProcessError; + }; + + public type WorkspaceCache = { + var status : { + #initialized; + #loading: (Nat,Nat,[Bool]); //(chunks we've seen, totalChunks, map of recieved items) + #doneLoading; + #processing: Nat; + #doneProcessing; + #returning: Nat; + #done + }; + data: Workspace; + }; + + + + public type ChunkRequest = { + chunkID: Nat; + event: ?Text; + sourceIdentifier: ?Hash.Hash; + }; + + public type ChunkGet = { + chunkID: Nat; + chunkSize: Nat; + pipeInstanceID: PipeInstanceID; + }; + + public type PushStatusRequest = { + pipeInstanceID: PipeInstanceID; + }; + public type ProcessingStatusRequest = { + pipeInstanceID: PipeInstanceID; + }; + + public type DataSource = actor { + requestPipelinifyChunk : (_request : ChunkRequest) -> async Result; + queryPipelinifyChunk : query (_request : ChunkRequest) -> async Result; + }; + + public type ProcessActor = actor { + process : (_request : ProcessRequest) -> async Result; + getChunk : (_request : ChunkGet) -> async Result; + pushChunk: (_request: ChunkPush) -> async Result; + getPushStatus: query (_request: PushStatusRequest) -> async Result; + getProcessingStatus: query (_request: ProcessingStatusRequest) -> async Result; + singleStep: (_request: StepRequest) -> async Result; + }; + + + public type PushToPipeStatus = {#pending : Nat; #finished; #err: ProcessError;}; + public type ProcessingStatus = {#pending : (Nat, Nat, {#parallel;#sequential;}); #finished; #err: ProcessError;}; + + public type ProcessType = { + #unconfigured; + #error; + #sequential: Nat; + #parallel: { + stepMap: [Bool]; + steps: Nat; + }; + + }; + + + + + + + //error list + // + // Intake push errors + // 8 - cannot find intake cache for provided id + // 9 - Cannot add to a 'done' intake cache + // 10 - Do not send an error chunk type to the intake process + // 11 - Unloaded Intake Cache. Should Not Be Here. + // 12 - Request cache is missing. + // 16 - Do not send an error chunk + + // GetChunk errors + // 13 -- cannot find response cache + + //pushChunk + // 14 -- parallel - pipe already pushed + // 15 -- done loading + + //pushChunk Status + //17 -- Pipe is not in intake state + + //single step + //19 -- error with step + //20 -- map missing + //21 -- processing not ready + //22 -- done processing + + + +}; diff --git a/tests/_pipelinifyTest-Consumer.mo b/tests/_pipelinifyTest-Consumer.mo new file mode 100644 index 0000000..da0d298 --- /dev/null +++ b/tests/_pipelinifyTest-Consumer.mo @@ -0,0 +1,362 @@ +/////////////////////////////// +// +// ©2021 RIVVIR Tech LLC +// +//Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +//The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +/////////////////////////////// + +import Array "mo:base/Array"; +import Nat "mo:base/Nat"; +import Pipelinify "../src"; +import PipelinifyTypes "../src/types"; +import Principal "mo:base/Principal"; +import Hash "mo:base/Hash"; +import HashMap "mo:base/HashMap"; +import Iter "mo:base/Iter"; +import Result "mo:base/Result"; +import Debug "mo:base/Debug"; +import Time "mo:base/Time"; +import Nat8 "mo:base/Nat8"; +import Text "mo:base/Text"; +import Processor "_pipelinifyTest-Processor"; + + +shared (install) actor class Consumer() = this { + + type Result = Result.Result; + + //var nonce : Nat = 0; + + type Hash = Hash.Hash; + //let thisPrincipal : Principal = Principal.fromActor(this); + //let thisChunkHandler : PipelinifyTypes.DataSource = this; + + let processor : Processor.Processor = actor("ryjl3-tyaaa-aaaaa-aaaba-cai"); + + public func requestPipelinifyChunk(_request : PipelinifyTypes.ChunkRequest) : async Result{ + //Debug.print("Chunk being provided" # debug_show(_request)); + switch(_request.sourceIdentifier){ + case(?sourceIdentifier){ + if(sourceIdentifier == Text.hash("dataPullTest")){ + Debug.print("Returning 0,0,2,2"); + return #ok(#chunk([(0,0,#Bytes(#thawed([0:Nat8,0:Nat8,2:Nat8,2:Nat8])))])); + }; + if(sourceIdentifier == Text.hash("dataPullTestChunk")){ + Debug.print("Returning chunk"); + return #ok(#chunk([(0,_request.chunkID,#Bytes(#thawed([Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1)])))])); + }; + if(sourceIdentifier == Text.hash("dataPullTestChunkUnknown")){ + Debug.print("Returning chunk"); + if(_request.chunkID < 5){ + return #ok(#chunk([(0,_request.chunkID,#Bytes(#thawed([Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1)])))])); + } else if (_request.chunkID == 5){ + return #ok(#eof([(0,_request.chunkID,#Bytes(#thawed([Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1),Nat8.fromNat(_request.chunkID + 1)])))])); + } else if (_request.chunkID > 5){ + return #ok(#eof([(0,_request.chunkID,#Bytes(#thawed([])))])); + }; + }; + }; + case(_){ + return #err{text="Not Implemented"; code = 99899897;}; + }; + }; + return #err{text="Not Implemented"; code = 99899898;}; + }; + + public query func queryPipelinifyChunk(_request : PipelinifyTypes.ChunkRequest) : async Result{ + + Debug.print("Chunk being provided" # debug_show(_request)); + switch(_request.sourceIdentifier){ + case(?sourceIdentifier){ + if(sourceIdentifier == Text.hash("dataPullQueryFull")){ + Debug.print("Returning 32,33,34,35"); + return #ok(#chunk([(0,0,#Bytes(#thawed([32:Nat8,33:Nat8,34:Nat8,35:Nat8])))])); + }; + }; + + case(_){ + return #err{text="Not Implemented"; code = 99899897;}; + }; + }; + return #err{text="Not Implemented"; code = 99899898;}; + }; + + + public func testFullDataSendProcess(_starterArray: [PipelinifyTypes.AddressedChunk]) : async [PipelinifyTypes.AddressedChunk] { + + + Debug.print("==========testFullDataSendProcess=========="); + + + + + let response = await processor.process({ + event = ?"dataIncludedTest"; + dataConfig = #dataIncluded{ + data : [PipelinifyTypes.AddressedChunk] = _starterArray; + + }; + executionConfig = {executionMode = #onLoad}; + responseConfig = {responseMode = #push}; + processConfig = null; + }); + + switch(response){ + case(#ok(data)){ + switch(data){ + case(#dataIncluded(details)){ + return details.payload; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + return []; + }; + }; + + + return []; + }; + + + public func testPullFullProcess() : async [PipelinifyTypes.AddressedChunk] { + + + Debug.print("==========testPullFullProcess=========="); + + + + let response = await processor.process({ + event = ?"dataPullTest"; + dataConfig = #pull{ + sourceActor = ?this; + + sourceIdentifier = ?Text.hash("dataPullTest"); + mode = #pull; + totalChunks = ?1; + data = null; + }; + executionConfig = {executionMode = #onLoad}; + responseConfig = {responseMode = #push}; + processConfig = null; + }); + + switch(response){ + case(#ok(data)){ + switch(data){ + case(#dataIncluded(details)){ + return details.payload; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + return []; + }; + }; + + + return []; + }; + + public func testPullChunkProcess() : async [PipelinifyTypes.AddressedChunk] { + + + Debug.print("==========testPullChunkProcess=========="); + + + + let response = await processor.process({ + event = ?"dataPullTestChunk"; + dataConfig = #pull{ + sourceActor = ?this; + + sourceIdentifier = ?Text.hash("dataPullTestChunk"); + mode = #pull; + totalChunks = ?8; + data = ?[(0,0,#Bytes(#thawed([1,1,1,1])))]; + }; + executionConfig = {executionMode = #onLoad}; + responseConfig = {responseMode = #push}; + processConfig = null; + }); + + Debug.print(debug_show(response)); + + switch(response){ + case(#ok(data)){ + switch(data){ + case(#dataIncluded(details)){ + return details.payload; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + return []; + }; + }; + + + return []; + }; + + + public func testPullChunkUnknownProcess() : async [PipelinifyTypes.AddressedChunk] { + + + Debug.print("==========testPullChunkUnknownProcess=========="); + + + + let response = await processor.process({ + event = ?"dataPullTestChunkUnknown"; + dataConfig = #pull{ + sourceActor = ?this; + + sourceIdentifier = ?Text.hash("dataPullTestChunkUnknown"); + mode = #pull; + totalChunks = null; + data = ?[(0,0,#Bytes(#thawed([1,1,1,1])))]; + }; + executionConfig = {executionMode = #onLoad}; + responseConfig = {responseMode = #push}; + processConfig = null; + }); + + Debug.print(debug_show(response)); + + switch(response){ + case(#ok(data)){ + switch(data){ + case(#dataIncluded(details)){ + return details.payload; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + return []; + }; + }; + + + return []; + }; + + public func testPullFullQueryResponse() : async [PipelinifyTypes.AddressedChunk] { + + + Debug.print("==========testPullFullQueryResponse=========="); + + let response = await processor.process({ + event = ?"dataPullQueryFull"; + dataConfig = #pull{ + sourceActor = ?this; + + sourceIdentifier = ?Text.hash("dataPullQueryFull"); + mode = #pullQuery; + totalChunks = ?1; + data = null; + }; + executionConfig = {executionMode = #onLoad}; + responseConfig = {responseMode = #push}; + processConfig = null; + }); + + Debug.print(debug_show(response)); + + switch(response){ + case(#ok(data)){ + switch(data){ + case(#dataIncluded(details)){ + return details.payload; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + return []; + }; + }; + + + return []; + }; + + + public func testPushFullResponse() : async [PipelinifyTypes.AddressedChunk] { + + + Debug.print("==========testPushFullResponse=========="); + + let response = await processor.process({ + event = ?"dataPush"; + dataConfig = #push; + executionConfig = {executionMode = #onLoad}; + responseConfig = {responseMode = #push}; + processConfig = null; + }); + + Debug.print(debug_show(response)); + switch(response){ + case(#err(errType)){ + return []; + }; + case(#ok(responseType)){ + switch(responseType){ + case(#intakeNeeded(result)){ + let pushFullResponse = await processor.pushChunk({ + pipeInstanceID = result.pipeInstanceID; + chunk = #eof([(0,0,#Bytes(#thawed([10:Nat8,9:Nat8,8:Nat8,7:Nat8])))]);}); + + Debug.print("got a response from the push"); + Debug.print(debug_show(pushFullResponse)); + + switch(pushFullResponse){ + case(#ok(data)){ + switch(data){ + case(#dataIncluded(details)){ + return details.payload; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + return []; + }; + }; + }; + case(_){ + Debug.print("Did not find intake needed"); + return []; + }; + }; + + }; + }; + + + + + return []; + }; + +}; \ No newline at end of file diff --git a/tests/_pipelinifyTest-Processor.mo b/tests/_pipelinifyTest-Processor.mo new file mode 100644 index 0000000..6a4bc04 --- /dev/null +++ b/tests/_pipelinifyTest-Processor.mo @@ -0,0 +1,170 @@ +/////////////////////////////// +// +// ©2021 RIVVIR Tech LLC +// +//Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +//The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +/////////////////////////////// + +import Array "mo:base/Array"; +import Nat "mo:base/Nat"; +import Pipelinify "../src"; +import PipelinifyTypes "../src/types"; +import Candy "mo:candy"; +import Hash "mo:base/Hash"; +import HashMap "mo:base/HashMap"; +import Iter "mo:base/Iter"; +import Result "mo:base/Result"; +import Debug "mo:base/Debug"; +import Time "mo:base/Time"; +import Nat8 "mo:base/Nat8"; + +actor class Processor(){ + type Result = Result.Result; + + var nonce : Nat = 0; + + type Hash = Hash.Hash; + + func onProcess(_hash : Hash, _data : PipelinifyTypes.Workspace, _processRequest : ?PipelinifyTypes.ProcessRequest, _step : ?Nat) : PipelinifyTypes.PipelineEventResponse { + + //Debug.print("processing chunk" # debug_show(_processRequest)); + switch(_processRequest){ + case(?_processRequest){ + + + switch(_processRequest.dataConfig, _processRequest.event){ + case(#dataIncluded(data), ?event){ + if(event == "dataIncludedTest"){ + Debug.print("In the test"); + //Debug.print(debug_show( _data.get(0).get(0).toArray())); + if(Array.equal([0:Nat8,1:Nat8,2:Nat8,3:Nat8], Candy.valueUnstableToBytes(_data.get(0).get(0)), Nat8.equal)){ + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(0,4:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(1,5:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(2,6:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(3,7:Nat8); + Debug.print("done updating"); + return #dataUpdated; + } else { + //Debug.print(debug_show( _data.get(0).get(0).toArray())); + return #error({text = "Not Implemented"; code = 99999}); + }; + }; + + }; + case(#pull(data), ?event){ + if (event == "dataPullTest") { + //Debug.print("should have data" # debug_show(_data.get(0).get(0).toArray())); + if(Array.equal([0:Nat8,0:Nat8,2:Nat8,2:Nat8], Candy.valueUnstableToBytes(_data.get(0).get(0)), Nat8.equal)){ + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(0,3:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(1,3:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(2,4:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(3,4:Nat8); + return #dataUpdated; + } else { + return #error({text = "Not Implemented"; code = 99999}); + }; + }; + if (event == "dataPullTestChunk") { + //Debug.print("should have data from chunks" # debug_show(_data.get(0).get(0))); + let __data = Candy.valueUnstableToBytes(_data.get(0).get(7)); + if(Array.equal([8:Nat8,8:Nat8,8:Nat8,8:Nat8], [__data[0],__data[1],__data[2],__data[3]], Nat8.equal)){ + Debug.print("returning 8,8,8,8"); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(7)).put(0,8:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(7)).put(1,8:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(7)).put(2,8:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(7)).put(3,8:Nat8); + return #dataUpdated; + } else { + Debug.print("returning error"); + return #error({text = "Not Implemented"; code = 99999}); + }; + }; + if (event == "dataPullTestChunkUnknown") { + //Debug.print("should have data from unknown chunks" # debug_show(_data.get(0).get(0))); + let __data = Candy.valueUnstableToBytes(_data.get(0).get(5)); + if(Array.equal([6:Nat8,6:Nat8,6:Nat8,6:Nat8], [__data[0],__data[1],__data[2],__data[3]], Nat8.equal)){ + Debug.print("returning 6,6,6,6"); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(0,6:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(1,6:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(2,6:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(3,6:Nat8); + return #dataUpdated; + } else { + Debug.print("returning error"); + return #error({text = "Not Implemented"; code = 99999}); + }; + }; + if (event == "dataPullQueryFull") { + //Debug.print("should have data from query" # debug_show(_data.get(0).get(0))); + let __data = Candy.valueUnstableToBytes(_data.get(0).get(0)); + if(Array.equal([32:Nat8,33:Nat8,34:Nat8,35:Nat8], [__data[0],__data[1],__data[2],__data[3]], Nat8.equal)){ + Debug.print("returning 22,23,24,25"); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(0,22:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(1,23:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(2,24:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(3,25:Nat8); + return #dataUpdated; + } else { + Debug.print("returning error"); + return #error({text = "Not Implemented"; code = 99999}); + }; + } + + }; + case(#push, ?event){ + if (event == "dataPush") { + //Debug.print("should have data" # debug_show(_data.get(0).get(0))); + if(Array.equal([10:Nat8,9:Nat8,8:Nat8,7:Nat8], Candy.valueUnstableToBytes(_data.get(0).get(0)), Nat8.equal)){ + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(0,5:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(1,4:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(2,3:Nat8); + Candy.valueUnstableToBytesBuffer(_data.get(0).get(0)).put(3,2:Nat8); + return #dataUpdated; + } else { + return #error({text = "Not Implemented"; code = 99999}); + }; + }; + }; + + case(_, _){ + return #error({text = "Not Implemented"; code = 99999}); + } + }; + + }; + case(null){ + return #error({text = "process request null"; code = 999991}); + }; + }; + + return #error({text = "process request null"; code = 999991}); + }; + + + + let pipelinify = Pipelinify.Pipelinify({ + onDataWillBeLoaded = null; + onDataReady = null; + onPreProcess = null; + onProcess = ?onProcess; + onPostProcess = null; + onDataWillBeReturned = null; + onDataReturned = null; + getProcessType = null; + getLocalWorkspace = null; + putLocalWorkspace = null; + }); + + + public func process(_request : PipelinifyTypes.ProcessRequest) : async Result{ + return await pipelinify.process(_request); + }; + + public func pushChunk(_chunk : PipelinifyTypes.ChunkPush) : async Result{ + return await pipelinify.pushChunk(_chunk); + }; +}; diff --git a/tests/_pipelinifyTest-Runner.mo b/tests/_pipelinifyTest-Runner.mo new file mode 100644 index 0000000..9f93f6f --- /dev/null +++ b/tests/_pipelinifyTest-Runner.mo @@ -0,0 +1,164 @@ +/////////////////////////////// +// +// ©2021 RIVVIR Tech LLC +// +//Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +//The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +/////////////////////////////// + + +import Array "mo:base/Array"; +import Nat "mo:base/Nat"; +import Pipelinify "../src"; +import PipelinifyTypes "../src/types"; +import Candy "mo:candy"; +import Hash "mo:base/Hash"; +import HashMap "mo:base/HashMap"; +import Iter "mo:base/Iter"; +import Result "mo:base/Result"; +import Debug "mo:base/Debug"; +import Time "mo:base/Time"; +import Nat8 "mo:base/Nat8"; +import Consumer "_pipelinifyTest-Consumer"; + +actor class pipelinify_runner(){ + type Result = Result.Result; + + var nonce : Nat = 0; + + type Hash = Hash.Hash; + + public func Test() : async Result{ + ///////////////////////// + // Data Load Test + // fullData - ✓ - Makes a request of the consumer, consuemer sends all data to the processor and gets a response. + // pullData(at Once) - ✓ - Makes a request of the consumer, consumer asks the processor to pull the data and then process + // pullData(chunks) - ✓ - Makes a request of the consumer, consumer asks the processor to pull 7 remaining chunks and then process + // pullDataUnkownNumberOfChunks - ✓ - Makes a request of the consumer, consumer asks the processor to pull remaining chunks ..consumer must respond with eof, and then processor process + // pullDataQuery ✓ - Makes a request of the consumer, consumer asks the processor to pull remaining chunks via query...uses same pathway as pull but switches to query + // push(delayed at once) | push(chunks) + // + // Execution Tests + // onLoad | selfServe(at once) | selfServe(step) | still processing | aget(at once) | agent(step) + // + // Data Retrieval Tests + // included | pullData(at once) | pullData(steps) | pullDataQuery | + ////////////////////////// + + var testStream : Text = "Running Tests \n"; + let consumer : Consumer.Consumer = actor("rrkah-fqaaa-aaaaa-aaaaq-cai"); + + + //////// + //Full Data Test + //////// + + testStream #= "Testing Full Data Send - "; + + let testFullDataSendResponse = await consumer.testFullDataSendProcess([(0,0,#Bytes(#thawed([0,1,2,3])))]); + let tester5 = Candy.valueToBytes(testFullDataSendResponse[0].2); + + if(tester5[0] == 4 and tester5[1] == 5 and tester5[2] == 6 and tester5[3] == 7){ + testStream #= "✓\n"; + } + else{ + testStream #= "x\n"; + testStream #= "Expected " # debug_show(testFullDataSendResponse) # " to be [4,5,6,7]"; + }; + + + //////// + //Pull Full Data + //////// + testStream #= "Testing Pull Full Data - "; + + let testPullFullResponse = await consumer.testPullFullProcess(); + let tester4 = Candy.valueToBytes(testPullFullResponse[0].2); + + if(tester4[0] == 3 and tester4[1] == 3 and tester4[2] == 4 and tester4[3] == 4){ + testStream #= "✓\n"; + } + else{ + testStream #= "x\n"; + testStream #= "Expected " # debug_show(testPullFullResponse) # " to be [3,3,4,4]"; + }; + + //////// + //Pull Chunked Data + //////// + testStream #= "Testing Pull Chunk Data - "; + + let testPullChunkResponse = await consumer.testPullChunkProcess(); + let tester3 = Candy.valueToBytes(testPullChunkResponse[7].2); + + Debug.print(debug_show(testPullChunkResponse)); + + if(tester3[0] == 8 and tester3[1] == 8 and tester3[2] == 8 and tester3[3] == 8){ + testStream #= "✓\n"; + } + else{ + testStream #= "x\n"; + testStream #= "Expected " # debug_show(testPullChunkResponse) # " to be [8,8,8,8]"; + }; + + //////// + //Pull Chunked Data - unkown size + //////// + testStream #= "Testing Pull Chunk Data Unknown Size - "; + + let testPullChunkUnknownResponse = await consumer.testPullChunkUnknownProcess(); + let tester2 = Candy.valueToBytes(testPullChunkUnknownResponse[0].2); + + if(tester2[0] == 6 and tester2[1] == 6 and tester2[2] == 6 and tester2[3] == 6){ + testStream #= "✓\n"; + } + else{ + testStream #= "x\n"; + testStream #= "Expected " # debug_show(testPullChunkUnknownResponse) # " to be [6,6,6,6]"; + }; + + //////// + //Pull Full Data Via Query + //////// + testStream #= "Testing Pull Full Data via Query - "; + + let testPullFullQueryResponse = await consumer.testPullFullQueryResponse(); + let tester1 = Candy.valueToBytes(testPullFullQueryResponse[0].2); + + + if(tester1[0] == 22 and tester1[1] == 23 and tester1[2] == 24 and tester1[3] == 25){ + testStream #= "✓\n"; + } + else{ + testStream #= "x\n"; + testStream #= "Expected " # debug_show(testPullFullQueryResponse) # " to be [22,23,26,25]"; + }; + + //////// + //Push Full Data + //////// + testStream #= "Testing Push Data - Full - "; + + let testPushFullResponse = await consumer.testPushFullResponse(); + let tester = Candy.valueToBytes(testPushFullResponse[0].2); + + if(tester[0] == 5 and tester[1] == 4 and tester[2] == 3 and tester[3] == 2){ + testStream #= "✓\n"; + } + else{ + testStream #= "x\n"; + testStream #= "Expected " # debug_show(testPushFullResponse) # " to be [5,4,3,2]"; + }; + + + + + return #ok(testStream); + + }; + + +}; diff --git a/vessel.dhall b/vessel.dhall new file mode 100644 index 0000000..be5f17a --- /dev/null +++ b/vessel.dhall @@ -0,0 +1,4 @@ +{ + dependencies = [ "base", "candy","principal" ], + compiler = Some "0.6.2" +}