From bce7653be20ef90948b55c6f98974816b5e53f6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20H=C3=B6ffler?= Date: Fri, 22 Mar 2024 15:48:46 +0100 Subject: [PATCH] Added error handling that reacts to continueOnError flag --- templates/lib/actions/action.js | 138 +++++++++-------- templates/lib/triggers/trigger.js | 241 ++++++++++++++++-------------- 2 files changed, 201 insertions(+), 178 deletions(-) diff --git a/templates/lib/actions/action.js b/templates/lib/actions/action.js index d1131f9..6c82349 100644 --- a/templates/lib/actions/action.js +++ b/templates/lib/actions/action.js @@ -19,78 +19,90 @@ async function processAction(msg, cfg, snapshot, incomingMessageHeaders, tokenDa let logger = this.logger; const { logLevel } = cfg.nodeSettings; - if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) { - logger = this.logger.child({}); - logger.level && logger.level(logLevel); - } + let continueOnError = false; + if (cfg && cfg.nodeSettings && cfg.nodeSettings.continueOnError) continueOnError = true; - logger.debug("Incoming message: %j", msg); - logger.trace("Incoming configuration: %j", cfg); - logger.debug("Incoming snapshot: %j", snapshot); - logger.debug("Incoming message headers: %j", incomingMessageHeaders); - logger.debug("Incoming token data: %j", tokenData); - - const actionFunction = tokenData["function"]; - logger.info("Starting to execute action '%s'", actionFunction); - - const action = componentJson.actions[actionFunction]; - const { operationId, pathName, method, requestContentType } = action.callParams; - logger.info( - "Found spec callParams: 'pathName': %s, 'method': %s, 'requestContentType': %s", - pathName, - method, - requestContentType - ); - - const specPath = spec.paths[pathName]; - const specPathParameters = specPath[method].parameters ? specPath[method].parameters.map(({ name }) => name) : []; - - let body = msg.data; - mapFieldNames(body); - if (requestContentType === "multipart/form-data") { - logger.info("requestContentType multipart/form-data is defined"); - body = await mapFormDataBody.call(this, action, body); - } + try { - let parameters = {}; - for (let param of specPathParameters) { - parameters[param] = body[param]; - } - logger.debug("Parameters were populated from configuration: %j", parameters); + if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) { + logger = this.logger.child({}); + logger.level && logger.level(logLevel); + } - $SECURITIES; + logger.debug("Incoming message: %j", msg); + logger.trace("Incoming configuration: %j", cfg); + logger.debug("Incoming snapshot: %j", snapshot); + logger.debug("Incoming message headers: %j", incomingMessageHeaders); + logger.debug("Incoming token data: %j", tokenData); + + const actionFunction = tokenData["function"]; + logger.info("Starting to execute action '%s'", actionFunction); + + const action = componentJson.actions[actionFunction]; + const { operationId, pathName, method, requestContentType } = action.callParams; + logger.info( + "Found spec callParams: 'pathName': %s, 'method': %s, 'requestContentType': %s", + pathName, + method, + requestContentType + ); + + const specPath = spec.paths[pathName]; + const specPathParameters = specPath[method].parameters ? specPath[method].parameters.map(({ name }) => name) : []; + + let body = msg.data; + mapFieldNames(body); + if (requestContentType === "multipart/form-data") { + logger.info("requestContentType multipart/form-data is defined"); + body = await mapFormDataBody.call(this, action, body); + } - if (cfg.otherServer) { - if (!spec.servers) { - spec.servers = []; + let parameters = {}; + for (let param of specPathParameters) { + parameters[param] = body[param]; } - spec.servers.push({ url: cfg.otherServer }); - logger.debug("Server: %s was added to spec servers array", cfg.otherServer); - } + logger.debug("Parameters were populated from configuration: %j", parameters); - const callParams = { - spec: spec, - operationId: operationId, - pathName: pathName, - method: method, - parameters: parameters, - requestContentType: requestContentType, - requestBody: body, - securities: { authorized: securities }, - server: spec.servers[cfg.server] || cfg.otherServer, - }; - if (callParams.method === "get") { - delete callParams.requestBody; - } + $SECURITIES; + if (cfg.otherServer) { + if (!spec.servers) { + spec.servers = []; + } + spec.servers.push({ url: cfg.otherServer }); + logger.debug("Server: %s was added to spec servers array", cfg.otherServer); + } + + const callParams = { + spec: spec, + operationId: operationId, + pathName: pathName, + method: method, + parameters: parameters, + requestContentType: requestContentType, + requestBody: body, + securities: { authorized: securities }, + server: spec.servers[cfg.server] || cfg.otherServer, + }; + if (callParams.method === "get") { + delete callParams.requestBody; + } - const resp = await executeCall.call(this, callParams); - const newElement = {}; - newElement.metadata = getMetadata(msg.metadata); - newElement.data = resp.body; - this.emit("data", newElement); - this.logger.info("Execution finished"); + const resp = await executeCall.call(this, callParams); + + const newElement = {}; + newElement.metadata = getMetadata(msg.metadata); + newElement.data = resp.body; + this.emit("data", newElement); + this.logger.info("Execution finished"); + } catch (e) { + if (continueOnError === true) { + this.emit('data', {}); + } + logger.error(e); + this.emit('error', e); + } } module.exports = { process: processAction }; diff --git a/templates/lib/triggers/trigger.js b/templates/lib/triggers/trigger.js index 1a08631..7cff8bb 100644 --- a/templates/lib/triggers/trigger.js +++ b/templates/lib/triggers/trigger.js @@ -18,137 +18,148 @@ const componentJson = require("../../component.json"); async function processTrigger(msg, cfg, snapshot, incomingMessageHeaders, tokenData) { let logger = this.logger; - const { snapshotKey, arraySplittingKey, syncParam, skipSnapshot, logLevel } = cfg.nodeSettings; + let continueOnError = false; + if (cfg && cfg.nodeSettings && cfg.nodeSettings.continueOnError) continueOnError = true; - if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) { - logger = this.logger.child({}); - logger.level && logger.level(logLevel); - } + try { + const { snapshotKey, arraySplittingKey, syncParam, skipSnapshot, logLevel } = cfg.nodeSettings; - logger.debug("Incoming message: %j", msg); - logger.trace("Incoming configuration: %j", cfg); - logger.debug("Incoming message headers: %j", incomingMessageHeaders); - logger.debug("Incoming token data: %j", tokenData); - - const triggerFunction = tokenData["function"]; - logger.info('Starting to execute trigger "%s"', triggerFunction); - - logger.info("Incoming snapshot: %j", snapshot); - - snapshot.lastUpdated = getInitialSnapshotValue(cfg, snapshot); - - logger.info("Using snapshot: %j", snapshot); - - logger.info( - 'Trigger settings - "snapshotKey": %s, "arraySplittingKey": %s, "syncParam": %s, "skipSnapshot": %s', - snapshotKey, - arraySplittingKey, - syncParam, - skipSnapshot - ); - - const trigger = componentJson.triggers[triggerFunction]; - const { operationId, pathName, method, requestContentType } = trigger.callParams; - logger.info( - 'Found spec callParams: "pathName": %s, "method": %s, "requestContentType": %s', - pathName, - method, - requestContentType - ); - - const specPath = spec.paths[pathName]; - const specPathParameters = specPath[method].parameters ? specPath[method].parameters.map(({ name }) => name) : []; - - let triggerParams = cfg.triggerParams; - if (!triggerParams) { - logger.debug("Trigger params was not found in cfg.triggerParams, going to look into cfg"); - triggerParams = cfg; - } else { - logger.info("Found incoming trigger params: %j", triggerParams); - } + if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) { + logger = this.logger.child({}); + logger.level && logger.level(logLevel); + } - let parameters = {}; - for (let param of specPathParameters) { - parameters[param] = triggerParams[param]; - } + logger.debug("Incoming message: %j", msg); + logger.trace("Incoming configuration: %j", cfg); + logger.debug("Incoming message headers: %j", incomingMessageHeaders); + logger.debug("Incoming token data: %j", tokenData); - if (syncParam && snapshot.lastUpdated) { - if (syncParam === "$FILTER") { - if (!snapshotKey) { - throw new Error("snapshotKey params should be specified!"); - } - parameters[syncParam] = `${snapshotKey} gt datetime'${snapshot.lastUpdated}'`; + const triggerFunction = tokenData["function"]; + logger.info('Starting to execute trigger "%s"', triggerFunction); + + logger.info("Incoming snapshot: %j", snapshot); + + snapshot.lastUpdated = getInitialSnapshotValue(cfg, snapshot); + + logger.info("Using snapshot: %j", snapshot); + + logger.info( + 'Trigger settings - "snapshotKey": %s, "arraySplittingKey": %s, "syncParam": %s, "skipSnapshot": %s', + snapshotKey, + arraySplittingKey, + syncParam, + skipSnapshot + ); + + const trigger = componentJson.triggers[triggerFunction]; + const { operationId, pathName, method, requestContentType } = trigger.callParams; + logger.info( + 'Found spec callParams: "pathName": %s, "method": %s, "requestContentType": %s', + pathName, + method, + requestContentType + ); + + const specPath = spec.paths[pathName]; + const specPathParameters = specPath[method].parameters ? specPath[method].parameters.map(({ name }) => name) : []; + + let triggerParams = cfg.triggerParams; + if (!triggerParams) { + logger.debug("Trigger params was not found in cfg.triggerParams, going to look into cfg"); + triggerParams = cfg; } else { - parameters[syncParam] = snapshot.lastUpdated; + logger.info("Found incoming trigger params: %j", triggerParams); } - } - logger.debug("Parameters were populated from configuration: %j", parameters); - $SECURITIES; + let parameters = {}; + for (let param of specPathParameters) { + parameters[param] = triggerParams[param]; + } - if (cfg.otherServer) { - if (!spec.servers) { - spec.servers = []; + if (syncParam && snapshot.lastUpdated) { + if (syncParam === "$FILTER") { + if (!snapshotKey) { + throw new Error("snapshotKey params should be specified!"); + } + parameters[syncParam] = `${snapshotKey} gt datetime'${snapshot.lastUpdated}'`; + } else { + parameters[syncParam] = snapshot.lastUpdated; + } } - spec.servers.push({ url: cfg.otherServer }); - logger.debug("Server: %s was added to spec servers array", cfg.otherServer); - } + logger.debug("Parameters were populated from configuration: %j", parameters); - const paginationConfig = $PAGINATION_CONFIG; + $SECURITIES; - // if there is user provided pageSize - if (paginationConfig?.pageSizeOption?.fieldName) { - // if user specified pageSize - we take that - if (parameters[paginationConfig.pageSizeOption.fieldName]) { - paginationConfig.strategy.pageSize = parseInt(parameters[paginationConfig.pageSizeOption.fieldName]); - } - // otherwise we use a configured pageSize - else { - parameters[paginationConfig.pageSizeOption.fieldName] = paginationConfig.strategy.pageSize; + if (cfg.otherServer) { + if (!spec.servers) { + spec.servers = []; + } + spec.servers.push({ url: cfg.otherServer }); + logger.debug("Server: %s was added to spec servers array", cfg.otherServer); } - } - logger.info("Pagination config %j", paginationConfig); - - let callParams = { - spec: spec, - operationId: operationId, - pathName: pathName, - method: method, - parameters: parameters, - requestContentType: requestContentType, - securities: { authorized: securities }, - server: spec.servers[cfg.server] || cfg.otherServer, - }; - - const paginator = createPaginator(paginationConfig); - - let hasMorePages = true; - do { - const { body, headers } = await executeCall.call(this, callParams); - - const newElement = {}; - newElement.metadata = getMetadata(msg.metadata); - newElement.data = getElementDataFromResponse.call(this, arraySplittingKey, body); - if (skipSnapshot) { - logger.info("Option skipSnapshot enabled, just going to return found data, pagination is disabled"); - return newElement.data; //no pagination if skipping snapshot - } else { - await dataAndSnapshot.call(this, newElement, snapshot, snapshotKey, "", this); + const paginationConfig = $PAGINATION_CONFIG; + + // if there is user provided pageSize + if (paginationConfig?.pageSizeOption?.fieldName) { + // if user specified pageSize - we take that + if (parameters[paginationConfig.pageSizeOption.fieldName]) { + paginationConfig.strategy.pageSize = parseInt(parameters[paginationConfig.pageSizeOption.fieldName]); + } + // otherwise we use a configured pageSize + else { + parameters[paginationConfig.pageSizeOption.fieldName] = paginationConfig.strategy.pageSize; + } } - // pagination - if (paginator.hasNextPage({ body, headers })) { - callParams = { ...callParams, parameters: { ...callParams.parameters } }; - callParams.parameters[paginationConfig.pageTokenOption.fieldName] = paginator.getNextPageToken({ body, headers }); - logger.info("Found the next page, going to request..."); - hasMorePages = true; - } else { - logger.info("All pages have been received"); - hasMorePages = false; + logger.info("Pagination config %j", paginationConfig); + + let callParams = { + spec: spec, + operationId: operationId, + pathName: pathName, + method: method, + parameters: parameters, + requestContentType: requestContentType, + securities: { authorized: securities }, + server: spec.servers[cfg.server] || cfg.otherServer, + }; + + const paginator = createPaginator(paginationConfig); + + let hasMorePages = true; + do { + const { body, headers } = await executeCall.call(this, callParams); + + const newElement = {}; + newElement.metadata = getMetadata(msg.metadata); + newElement.data = getElementDataFromResponse.call(this, arraySplittingKey, body); + if (skipSnapshot) { + logger.info("Option skipSnapshot enabled, just going to return found data, pagination is disabled"); + return newElement.data; //no pagination if skipping snapshot + } else { + await dataAndSnapshot.call(this, newElement, snapshot, snapshotKey, "", this); + } + + // pagination + if (paginator.hasNextPage({ body, headers })) { + callParams = { ...callParams, parameters: { ...callParams.parameters } }; + callParams.parameters[paginationConfig.pageTokenOption.fieldName] = paginator.getNextPageToken({ body, headers }); + logger.info("Found the next page, going to request..."); + hasMorePages = true; + } else { + logger.info("All pages have been received"); + hasMorePages = false; + } + } while (hasMorePages); + logger.info("Execution finished"); + } catch (e) { + if (continueOnError === true) { + this.emit('data', {}) } - } while (hasMorePages); - logger.info("Execution finished"); + logger.error(e); + this.emit('error', e); + } } module.exports = { process: processTrigger };