diff --git a/node-support/src/action-support.js b/node-support/src/action-support.js index a39ef06ef..1e7e025d1 100644 --- a/node-support/src/action-support.js +++ b/node-support/src/action-support.js @@ -55,7 +55,24 @@ class ActionHandler { debug("%s [%s.%s] - " + msg, ...[this.streamId, this.support.service.name, this.grpcMethod.name].concat(args)); } + /** + * Context for an action command. + * + * @interface module:cloudstate.Action.ActionCommandContext + * @extends module:cloudstate.CommandContext + * @property {boolean} cancelled Whether the client is still connected. + * @property {module:cloudstate.Metadata} metadata The metadata associated with the command. + * @property {module:cloudstate.CloudEvent} cloudevent The CloudEvents metadata associated with the command. + */ createContext(metadata) { + /** + * Write a message. + * + * @function module:cloudstate.Action.ActionCommandContext#write + * @param {Object} message The protobuf message to write. + * @param {module:cloudstate.Metadata} metadata The metadata associated with the message. + */ + const call = this.call; let metadataObject = new Metadata([]); if (metadata && metadata.entries) { @@ -74,6 +91,13 @@ class ActionHandler { } }; + /** + * Register an event handler. + * + * @function module:cloudstate.Action.ActionCommandContext#on + * @param {string} eventType The type of the event. + * @param {function} callback The callback to handle the event. + */ ctx.on = (eventType, callback) => { if (this.supportedEvents.includes(eventType)) { this.callbacks[eventType] = callback; @@ -96,6 +120,12 @@ class ActionHandler { } } + /** + * Context for a unary action command. + * + * @interface module:cloudstate.Action.UnaryCommandContext + * @extends module:cloudstate.Action.ActionCommandContext + */ handleUnary() { this.setupUnaryOutContext(); const deserializedCommand = this.grpcMethod.resolvedRequestType.decode(this.call.request.payload.value); @@ -113,6 +143,13 @@ class ActionHandler { } } + /** + * Context for a streamed in action command. + * + * @interface module:cloudstate.Action.StreamedInCommandContext + * @extends module:cloudstate.Action.StreamedInContext + * @extends module:cloudstate.Action.ActionCommandContext + */ handleStreamedIn() { this.setupUnaryOutContext(); this.setupStreamedInContext(); @@ -130,12 +167,25 @@ class ActionHandler { } } + /** + * Context for a streamed out action command. + * + * @interface module:cloudstate.Action.StreamedOutCommandContext + * @extends module:cloudstate.Action.StreamedOutContext + */ handleStreamedOut() { this.setupStreamedOutContext(); const deserializedCommand = this.grpcMethod.resolvedRequestType.decode(this.call.request.payload.value); this.invokeUserCallback("command", this.commandHandler, deserializedCommand, this.ctx); } + /** + * Context for a streamed action command. + * + * @interface module:cloudstate.Action.StreamedCommandContext + * @extends module:cloudstate.Action.StreamedInContext + * @extends module:cloudstate.Action.StreamedOutContext + */ handleStreamed() { this.setupStreamedInContext(); this.setupStreamedOutContext(); @@ -145,7 +195,7 @@ class ActionHandler { setupUnaryOutContext() { const effects = []; - this.ctx.forward = (method, message, metadata) => { + this.ctx.thenForward = (method, message, metadata) => { this.ensureNotCancelled(); this.streamDebug("Forwarding to %s", method); const forward = this.support.effectSerializer.serializeEffect(method, message, metadata); @@ -192,7 +242,19 @@ class ActionHandler { }; } + /** + * Context for a action command that returns a streamed message out. + * + * @interface module:cloudstate.Action.StreamedOutContext + * @extends module:cloudstate.Action.ActionCommandContext + */ setupStreamedOutContext() { + + /** + * A cancelled event. + * + * @event module:cloudstate.Action.StreamedOutContext#cancelled + */ this.supportedEvents.push("cancelled"); this.call.on("cancelled", () => { @@ -200,6 +262,11 @@ class ActionHandler { this.invokeCallback("cancelled", this.ctx); }); + /** + * Terminate the outgoing stream of messages. + * + * @function module:cloudstate.Action.StreamedOutContext#end + */ this.ctx.end = () => { if (this.call.cancelled) { this.streamDebug("end invoked when already cancelled."); @@ -209,7 +276,7 @@ class ActionHandler { } }; - this.ctx.forward = (method, message, metadata) => { + this.ctx.thenForward = (method, message, metadata) => { this.ensureNotCancelled(); this.streamDebug("Forwarding to %s", method); const forward = this.support.effectSerializer.serializeEffect(method, message, metadata); @@ -257,8 +324,30 @@ class ActionHandler { }; } + /** + * Context for a action command that handles streamed messages in. + * + * @interface module:cloudstate.Action.StreamedInContext + * @extends module:cloudstate.Action.ActionCommandContext + */ setupStreamedInContext() { + /** + * A data event. + * + * Emitted when a new message arrives. + * + * @event module:cloudstate.Action.StreamedInContext#data + * @type {Object} + */ this.supportedEvents.push("data"); + + /** + * A stream end event. + * + * Emitted when the input stream terminates. + * + * @event module:cloudstate.Action.StreamedInContext#end + */ this.supportedEvents.push("end"); this.call.on("data", (data) => { @@ -272,6 +361,11 @@ class ActionHandler { this.invokeCallback("end", this.ctx); }); + /** + * Cancel the incoming stream of messages. + * + * @function module:cloudstate.Action.StreamedInContext#cancel + */ this.ctx.cancel = () => { if (this.call.cancelled) { this.streamDebug("cancel invoked when already cancelled."); @@ -370,6 +464,8 @@ module.exports = class ActionServices { handleStreamed(call) { call.on("data", data => { + // Ignore the remaining data by default + call.on("data", () => {}); const handler = this.createHandler(call, null, data); if (handler) { handler.handleStreamed(); @@ -386,6 +482,8 @@ module.exports = class ActionServices { handleStreamedIn(call, callback) { call.on("data", data => { + // Ignore the remaining data by default + call.on("data", () => {}); const handler = this.createHandler(call, callback, data); if (handler) { handler.handleStreamedIn(); diff --git a/node-support/src/action.js b/node-support/src/action.js index 550d1408f..fff820676 100644 --- a/node-support/src/action.js +++ b/node-support/src/action.js @@ -24,11 +24,52 @@ const CloudState = require("./cloudstate"); const actionServices = new ActionSupport(); /** - * A Cloudstate Action + * A unary action command handler. * - * @namespace module:cloudstate.action + * @callback module:cloudstate.Action~unaryCommandHandler + * @param {Object} command The command message, this will be of the type of the gRPC service call input type. + * @param {module:cloudstate.Action.UnaryCommandContext} context The command context. + * @returns {undefined|Object|Promise} The message to reply with, it must match the gRPC service call output type for + * this command. If replying by using context.write, undefined must be returned. */ +/** + * A streamed in action command handler. + * + * @callback module:cloudstate.Action~streamedInCommandHandler + * @param {module:cloudstate.Action.StreamedInCommandContext} context The command context. + * @returns {undefined|Object|Promise} The message to reply with, it must match the gRPC service call output type for + * this command. If replying by using context.write, undefined must be returned. + */ + +/** + * A streamed out command handler. + * + * @callback module:cloudstate.Action~streamedOutCommandHandler + * @param {Object} command The command message, this will be of the type of the gRPC service call input type. + * @param {module:cloudstate.Action.StreamedOutCommandContext} context The command context. + */ + +/** + * A streamed command handler. + * + * @callback module:cloudstate.Action~streamedCommandHandler + * @param {module:cloudstate.Action.StreamedCommandContext} context The command context. + */ + +/** + * An action command handler. + * + * @typedef module:cloudstate.Action.ActionCommandHandler + * @type {module:cloudstate.Action~unaryCommandHandler|module:cloudstate.Action~streamedInCommandHandler|module:cloudstate.Action~streamedOutCommandHandler|module:cloudstate.Action~streamedCommandHandler} + */ + +/** + * An action. + * + * @memberOf module:cloudstate + * @extends module:cloudstate.Entity + */ class Action { /** @@ -36,7 +77,7 @@ class Action { * * @param {string|string[]} desc A descriptor or list of descriptors to parse, containing the service to serve. * @param {string} serviceName The fully qualified name of the service that provides this entities interface. - * @param {module:cloudstate.EventSourced~options=} options The options for this event sourced entity + * @param {module:cloudstate.Action~options=} options The options for this event sourced entity */ constructor(desc, serviceName, options) { @@ -69,7 +110,7 @@ class Action { * * The names of the properties must match the names of the service calls specified in the gRPC descriptor * - * @type {Object.} + * @type {Object.} */ this.commandHandlers = {}; } diff --git a/node-support/src/command-helper.js b/node-support/src/command-helper.js index 34fd2afed..869cd6b6b 100644 --- a/node-support/src/command-helper.js +++ b/node-support/src/command-helper.js @@ -193,14 +193,20 @@ module.exports = class CommandHelper { accessor.replyMetadata = new Metadata([]); /** - * Effect context. + * Context for an entity. * - * @interface module:cloudstate.EffectContext + * @interface module:cloudstate.EntityContext * @property {string} entityId The id of the entity that the command is for. * @property {Long} commandId The id of the command. + * @property {module:cloudstate.Metadata} replyMetadata The metadata to send with a reply. + */ + + /** + * Effect context. + * + * @interface module:cloudstate.EffectContext * @property {module:cloudstate.Metadata} metadata The metadata associated with the command. * @property {module:cloudstate.CloudEvent} cloudevent The CloudEvents metadata associated with the command. - * @property {module:cloudstate.Metadata} replyMetadata The metadata to send with a reply. */ /** diff --git a/node-support/src/crdt-support.js b/node-support/src/crdt-support.js index 3edfb26e7..7988b7e22 100644 --- a/node-support/src/crdt-support.js +++ b/node-support/src/crdt-support.js @@ -189,6 +189,7 @@ class CrdtHandler { * @interface module:cloudstate.crdt.CrdtCommandContext * @extends module:cloudstate.crdt.StateManagementContext * @extends module:cloudstate.CommandContext + * @extends module:cloudstate.EntityContext */ this.addStateManagementToContext(ctx); @@ -393,6 +394,7 @@ class CrdtHandler { * * @interface module:cloudstate.crdt.StateChangedContext * @extends module:cloudstate.CommandContext + * @extends module:cloudstate.EntityContext */ const ctx = this.commandHelper.createContext(subscriber.commandId, subscriber.metadata); @@ -464,6 +466,7 @@ class CrdtHandler { * * @interface module:cloudstate.crdt.StreamCancelledContext * @extends module:cloudstate.EffectContext + * @extends module:cloudstate.EntityContext * @extends module:cloudstate.crdt.StateManagementContext */ diff --git a/node-support/src/eventsourced-support.js b/node-support/src/eventsourced-support.js index 7a43aba15..0194da19d 100644 --- a/node-support/src/eventsourced-support.js +++ b/node-support/src/eventsourced-support.js @@ -106,6 +106,7 @@ class EventSourcedEntityHandler { * * @interface module:cloudstate.EventSourced.EventSourcedCommandContext * @extends module:cloudstate.CommandContext + * @extends module:cloudstate.EntityContext */ ctx.events = [];