diff --git a/src/orkes/OrkesConductorClient.ts b/src/orkes/OrkesConductorClient.ts index 700f138..aa11495 100644 --- a/src/orkes/OrkesConductorClient.ts +++ b/src/orkes/OrkesConductorClient.ts @@ -4,24 +4,34 @@ import { request as baseRequest } from "./request/request"; import { baseOrkesConductorClient } from "./BaseOrkesConductorClient"; import { FetchFn } from "./types"; import fetch, { Headers, RequestInit, Response } from "node-fetch"; +import http from "http"; +import https from "https"; + +const httpAgent = new http.Agent({ keepAlive: true }); +const httpsAgent = new https.Agent({ keepAlive: true }); + +const agent = (_parsedURL: URL) => + _parsedURL.protocol == "http:" ? httpAgent : httpsAgent; const nodeFetchWrapper: FetchFn = async ( input, options = {} ) => { - const res = await fetch(input.toString(), options as RequestInit); + const res = await fetch(input.toString(), { + ...options, + agent, + } as RequestInit); return res; }; -const fetchCache = - fetchCatchDns<{ headers: Record }, Response>( - nodeFetchWrapper, - { - //@ts-ignore - headerFactory: (headers?: HeadersInit) => - new Headers((headers as Record) || {}), - } - ); +const fetchCache = fetchCatchDns<{ headers: Record }, Response>( + nodeFetchWrapper, + { + //@ts-ignore + headerFactory: (headers?: HeadersInit) => + new Headers((headers as Record) || {}), + } +); const defaultRequestHandler: ConductorHttpRequest = ( __request, diff --git a/src/task/Poller.ts b/src/task/Poller.ts index 15a2c1f..73c3602 100644 --- a/src/task/Poller.ts +++ b/src/task/Poller.ts @@ -1,29 +1,41 @@ import { ConductorLogger, noopLogger } from "../common"; +import { + DEFAULT_POLL_INTERVAL, + DEFAULT_WARN_AT_O, + DEFAULT_CONCURRENCY, +} from "./constants"; interface PollerOptions { pollInterval?: number; concurrency: number; + warnAtO?: number; } -export class Poller { - private concurrentCalls: Array<{ - promise: Promise; - stop: () => Promise; - }> = []; - private pollFunction: () => Promise = async () => {}; +export class Poller { + private timeoutHandler?: NodeJS.Timeout; + private pollFunction: (count: number) => Promise; + private performWorkFunction: (work: T) => Promise = async () => {}; private polling = false; + private _tasksInProcess = 0; + private _counterAtO = 0; + private _pollerId: string = ""; options: PollerOptions = { - pollInterval: 1000, - concurrency: 1, + pollInterval: DEFAULT_POLL_INTERVAL, + concurrency: DEFAULT_CONCURRENCY, + warnAtO: DEFAULT_WARN_AT_O, }; logger: ConductorLogger = noopLogger; constructor( - pollFunction: () => Promise, + pollerId: string, + pollFunction: (count: number) => Promise, + performWorkFunction: (work: T) => Promise, pollerOptions?: Partial, logger?: ConductorLogger ) { + this._pollerId = pollerId; this.pollFunction = pollFunction; + this.performWorkFunction = performWorkFunction; this.options = { ...this.options, ...pollerOptions }; this.logger = logger || noopLogger; } @@ -32,6 +44,10 @@ export class Poller { return this.polling; } + get tasksInProcess() { + return this._tasksInProcess; + } + /** * Starts polling for work */ @@ -39,80 +55,69 @@ export class Poller { if (this.polling) { throw new Error("Runner is already started"); } - - return this.poll(); + this._tasksInProcess = 0; + this.polling = true; + this.poll(); }; /** * Stops Polling for work */ stopPolling = async () => { - await Promise.all(this.concurrentCalls.map((call) => call.stop())); this.polling = false; + clearTimeout(this.timeoutHandler!); }; - /** - * adds or shuts down concurrent calls based on the concurrency setting - * @param concurrency - */ - private updateConcurrency(concurrency: number) { - if (concurrency > 0 && concurrency !== this.options.concurrency) { - if (concurrency < this.options.concurrency) { - const result = this.concurrentCalls.splice( - 0, - this.options.concurrency - concurrency - ); - result.forEach((call) => { - call.stop(); - this.logger.debug("stopping some spawned calls"); - }); - } else { - for (let i = 0; i < concurrency - this.options.concurrency; i++) { - this.concurrentCalls.push(this.singlePoll()); - this.logger.debug("spawning additional poll calls"); - } - } - this.options.concurrency = concurrency; - } - } + private performWork = async (work: T) => { + await this.performWorkFunction(work); + this._tasksInProcess--; + }; updateOptions(options: Partial) { const newOptions = { ...this.options, ...options }; - this.updateConcurrency(newOptions.concurrency); this.options = newOptions; } private poll = async () => { - if (!this.polling) { - this.polling = true; - for (let i = 0; i < this.options.concurrency; i++) { - this.concurrentCalls.push(this.singlePoll()); - } - } - }; - - private singlePoll = () => { - let poll = this.polling; - let timeout: NodeJS.Timeout; - const pollingCall = async () => { - while (poll) { - await this.pollFunction(); - await new Promise( - (r) => - poll ? (timeout = setTimeout(() => r(true), this.options.pollInterval)): r(true) + while (this.isPolling) { + try { + // Concurrency could have been updated. Accounting for that + const count = Math.max( + 0, + this.options.concurrency - this._tasksInProcess ); + + if (count === 0) { + this.logger.debug( + "Max in process reached, Will skip polling for " + this._pollerId + ); + this._counterAtO++; + if (this._counterAtO > (this.options.warnAtO ?? 100)) { + this.logger.info( + `Not polling anything because in process tasks is maxed as concurrency level. ${this._pollerId}` + ); + } + } else { + this._counterAtO = 0; + const tasksResult: T[] = await this.pollFunction(count); + this._tasksInProcess = + this._tasksInProcess + (tasksResult ?? []).length; + + // Don't wait for the tasks to finish only 'listen' to the number of tasks being processes + tasksResult.forEach(this.performWork); + } + } catch (e: any) { + this.logger.error(`Error polling for tasks: ${e.message}`, e); } - }; - return { - promise: pollingCall(), - stop: (): Promise => - new Promise((r) => { - clearTimeout(timeout); - poll = false; - this.logger.debug("stopping single poll call"); - r(true); - }), - }; + await new Promise((r) => + this.isPolling + ? (this.timeoutHandler = setTimeout( + () => r(true), + this.options.pollInterval + )) + : r(true) + ); + } }; } diff --git a/src/task/TaskManager.ts b/src/task/TaskManager.ts index dbaa3ee..697db12 100644 --- a/src/task/TaskManager.ts +++ b/src/task/TaskManager.ts @@ -8,6 +8,11 @@ import { import { ConductorLogger, DefaultLogger } from "../common"; import { ConductorWorker } from "./Worker"; import { ConductorClient } from "../common/open-api"; +import { + DEFAULT_POLL_INTERVAL, + DEFAULT_BATCH_POLLING_TIMEOUT, + DEFAULT_CONCURRENCY, +} from "./constants"; export type TaskManagerOptions = TaskRunnerOptions; @@ -19,9 +24,10 @@ export interface TaskManagerConfig { const defaultManagerOptions: Required = { workerID: "", - pollInterval: 1000, + pollInterval: DEFAULT_POLL_INTERVAL, domain: undefined, - concurrency: 1, + concurrency: DEFAULT_CONCURRENCY, + batchPollingTimeout: DEFAULT_BATCH_POLLING_TIMEOUT, }; function workerId(options: Partial) { @@ -32,7 +38,7 @@ function workerId(options: Partial) { * Responsible for initializing and managing the runners that poll and work different task queues. */ export class TaskManager { - private tasks: Record> = {}; + private workerRunners: Map = new Map(); private readonly client: ConductorClient; private readonly logger: ConductorLogger; private readonly errorHandler: TaskErrorHandler; @@ -68,6 +74,7 @@ export class TaskManager { return { ...this.options, concurrency: worker.concurrency ?? this.options.concurrency, + pollInterval: worker.pollInterval ?? this.options.pollInterval, domain: worker.domain ?? this.options.domain, }; }; @@ -76,6 +83,21 @@ export class TaskManager { return this.polling; } + updatePollingOptionForWorker = ( + workerTaskDefName: string, + options: Partial + ) => { + const maybeRunner = this.workerRunners.get(workerTaskDefName); + + if (maybeRunner != null) { + maybeRunner.updateOptions(options); + } else { + this.logger.info( + `No runner found for worker with taskDefName: ${workerTaskDefName}` + ); + } + }; + /** * new options will get merged to existing options * @param options new options to update polling options @@ -86,26 +108,33 @@ export class TaskManager { ...this.workerManagerWorkerOptions(worker), ...options, }; - const runners = this.tasks[worker.taskDefName]; - runners.forEach((runner) => { - runner.updateOptions(newOptions); - }); + this.updatePollingOptionForWorker(worker.taskDefName, newOptions); }); this.options.concurrency = options.concurrency ?? this.options.concurrency; this.options.pollInterval = options.pollInterval ?? this.options.pollInterval; }; + sanityCheck = () => { + if (this.workers.length === 0) { + throw new Error("No workers supplied to TaskManager"); + } + const workerIDs = new Set(); + for (const item of this.workers) { + if (workerIDs.has(item.taskDefName)) { + throw new Error(`Duplicate worker taskDefName: ${item.taskDefName}`); + } + workerIDs.add(item.taskDefName); + } + }; + /** * Start polling for tasks */ startPolling = () => { + this.sanityCheck(); this.workers.forEach((worker) => { - this.tasks[worker.taskDefName] = []; const options = this.workerManagerWorkerOptions(worker); - this.logger.debug( - `Starting taskDefName=${worker.taskDefName} concurrency=${options.concurrency} domain=${options.domain}` - ); const runner = new TaskRunner({ worker, options, @@ -114,7 +143,7 @@ export class TaskManager { onError: this.errorHandler, }); runner.startPolling(); - this.tasks[worker.taskDefName].push(runner); + this.workerRunners.set(worker.taskDefName, runner); }); this.polling = true; }; @@ -122,11 +151,10 @@ export class TaskManager { * Stops polling for tasks */ stopPolling = async () => { - for (const taskType in this.tasks) { - await Promise.all( - this.tasks[taskType].map((runner) => runner.stopPolling()) - ); - this.tasks[taskType] = []; + for (const [workerTaskDefName, runner] of this.workerRunners) { + this.logger.debug(`Stopping taskDefName=${workerTaskDefName}`); + await runner.stopPolling(); + this.workerRunners.delete(workerTaskDefName); } this.polling = false; }; diff --git a/src/task/TaskRunner.ts b/src/task/TaskRunner.ts index 0cd529a..9ff8da2 100644 --- a/src/task/TaskRunner.ts +++ b/src/task/TaskRunner.ts @@ -2,6 +2,11 @@ import { ConductorLogger, noopLogger } from "../common"; import { ConductorWorker } from "./Worker"; import { Task, TaskResourceService, TaskResult } from "../common/open-api"; import { Poller } from "./Poller"; +import { + DEFAULT_POLL_INTERVAL, + DEFAULT_BATCH_POLLING_TIMEOUT, + DEFAULT_CONCURRENCY, +} from "./constants"; const DEFAULT_ERROR_MESSAGE = "An unknown error occurred"; const MAX_RETRIES = 3; @@ -13,11 +18,12 @@ export interface TaskRunnerOptions { domain: string | undefined; pollInterval?: number; concurrency?: number; + batchPollingTimeout?: number; } export interface RunnerArgs { worker: ConductorWorker; taskResource: TaskResourceService; - options: Required; + options: TaskRunnerOptions; logger?: ConductorLogger; onError?: TaskErrorHandler; concurrency?: number; @@ -26,6 +32,14 @@ export interface RunnerArgs { //eslint-disable-next-line export const noopErrorHandler: TaskErrorHandler = (__error: Error) => {}; +const defaultRunnerOptions: Required = { + workerID: "", + pollInterval: DEFAULT_POLL_INTERVAL, + domain: undefined, + concurrency: DEFAULT_CONCURRENCY, + batchPollingTimeout: DEFAULT_BATCH_POLLING_TIMEOUT, +}; + /** * Responsible for polling and executing tasks from a queue. * @@ -39,9 +53,9 @@ export class TaskRunner { taskResource: TaskResourceService; worker: ConductorWorker; private logger: ConductorLogger; - private options: Required; + private options: TaskRunnerOptions; errorHandler: TaskErrorHandler; - private poller: Poller; + private poller: Poller; constructor({ worker, @@ -53,11 +67,16 @@ export class TaskRunner { this.taskResource = taskResource; this.logger = logger; this.worker = worker; - this.options = options; + this.options = {...defaultRunnerOptions, ...options}; this.errorHandler = errorHandler; this.poller = new Poller( - this.pollAndExecute, - { concurrency: options.concurrency, pollInterval: options.pollInterval }, + worker.taskDefName, + this.batchPoll, + this.executeTask, + { + concurrency: worker.concurrency ?? options.concurrency, + pollInterval: worker.pollInterval ?? options.pollInterval, + }, this.logger ); } @@ -71,6 +90,9 @@ export class TaskRunner { */ startPolling = () => { this.poller.startPolling(); + this.logger.info( + `TaskWorker ${this.worker.taskDefName} initialized with concurrency of ${this.poller.options.concurrency} and poll interval of ${this.poller.options.pollInterval}` + ); }; /** * Stops Polling for work @@ -85,6 +107,9 @@ export class TaskRunner { concurrency: newOptions.concurrency, pollInterval: newOptions.pollInterval, }); + this.logger.info( + `TaskWorker ${this.worker.taskDefName} configuration updated with concurrency of ${this.poller.options.concurrency} and poll interval of ${this.poller.options.pollInterval}` + ); this.options = newOptions; } @@ -92,23 +117,16 @@ export class TaskRunner { return this.options; } - private pollAndExecute = async () => { - try { - const { workerID } = this.options; - const task = await this.taskResource.poll( - this.worker.taskDefName, - workerID, - this.worker.domain ?? this.options.domain - ); - if (task && task.taskId) { - await this.executeTask(task); - } else { - this.logger.debug(`No tasks for ${this.worker.taskDefName}`); - } - } catch (unknownError: unknown) { - this.handleUnknownError(unknownError); - this.errorHandler(unknownError as Error); - } + private batchPoll = async (count: number): Promise => { + const { workerID } = this.options; + const tasks = await this.taskResource.batchPoll( + this.worker.taskDefName, + workerID, + this.worker.domain ?? this.options.domain, + count, + this.options.batchPollingTimeout ?? 100 // default batch poll defined in the method + ); + return tasks; }; updateTaskWithRetry = async (task: Task, taskResult: TaskResult) => { @@ -132,7 +150,7 @@ export class TaskRunner { ); }; - executeTask = async (task: Task) => { + private executeTask = async (task: Task) => { try { const result = await this.worker.execute(task); await this.updateTaskWithRetry(task, { @@ -140,7 +158,7 @@ export class TaskRunner { workflowInstanceId: task.workflowInstanceId!, taskId: task.taskId!, }); - this.logger.debug(`Finished polling for task ${task.taskId}`); + this.logger.debug(`Task has executed successfully ${task.taskId}`); } catch (error: unknown) { await this.updateTaskWithRetry(task, { workflowInstanceId: task.workflowInstanceId!, diff --git a/src/task/Worker.ts b/src/task/Worker.ts index 24b1f42..c99a275 100644 --- a/src/task/Worker.ts +++ b/src/task/Worker.ts @@ -1,4 +1,4 @@ -import {Task, TaskResult} from "../common/open-api" +import { Task, TaskResult } from "../common/open-api"; /** * Functional interface for defining a worker implementation that processes tasks from a conductor queue. @@ -8,11 +8,14 @@ import {Task, TaskResult} from "../common/open-api" * here will be inherited from the `TaskManager` options. */ export interface ConductorWorker { - taskDefName: string - execute: (task: Task) => Promise> - domain?: string + taskDefName: string; + execute: ( + task: Task + ) => Promise>; + domain?: string; /* Number of polling instances to run concurrently */ - concurrency?: number + concurrency?: number; + pollInterval?: number; } diff --git a/src/task/__tests__/Poller.test.ts b/src/task/__tests__/Poller.test.ts index 820ad24..ef5f1fb 100644 --- a/src/task/__tests__/Poller.test.ts +++ b/src/task/__tests__/Poller.test.ts @@ -1,78 +1,322 @@ import { Poller } from "../Poller"; import { expect, describe, test, jest } from "@jest/globals"; +import { mockLogger } from "./mockLogger"; + +type Task = { description: string; id: number }; + +const fakeTaskGenerator = (count: number): Promise => + Promise.resolve( + Array.from({ length: count }, (_, i) => ({ + description: `task${i}`, + id: i, + })) + ); +const BASE_TIME = 100; +const TEST_POLLER_ID = "test-poller-id"; describe("Poller", () => { test("Should run the poll function once for each interval if concurrency is just one", async () => { - const functionToRun = jest.fn(); - const poller = new Poller( - async () => { - functionToRun(); - }, - { concurrency: 1, pollInterval: 1000 } + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn(() => + Promise.resolve() + ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 1, + pollInterval: BASE_TIME, + } ); + poller.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 3000)); + + await new Promise((r) => setTimeout(() => r(true), BASE_TIME)); + + // Concurrency is one so work only for one + expect(mockPerformWorkFunction).toBeCalledTimes(1); + + // Work for the first task + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task0", + id: 0, + }); + + await new Promise((r) => setTimeout(() => r(true), 3 * BASE_TIME)); poller.stopPolling(); - expect(functionToRun).toHaveBeenCalledTimes(3); + + // Should have been called 3 times one for each interval + expect(mockPoller).toHaveBeenCalledTimes(4); + + // Handling one concurrent event + expect(mockPoller).toHaveBeenCalledWith(1); }); - test("Should run the poll function twice as much. with two times the concurrency", async () => { - const functionToRun = jest.fn(); - const poller = new Poller( - async () => { - functionToRun(); - }, - { concurrency: 2, pollInterval: 1000 } + test("Should run the poll function 1 for each task. but handle two tasks concurrently", async () => { + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn(() => + Promise.resolve() + ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 2, + pollInterval: BASE_TIME, + } ); + poller.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 3000)); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME)); + + // Concurrency is two so should have been called twice + expect(mockPerformWorkFunction).toBeCalledTimes(2); + + // Work for the first task + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task0", + id: 0, + }); + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task1", + id: 1, + }); + + await new Promise((r) => setTimeout(() => r(true), 3 * BASE_TIME)); + + poller.stopPolling(); + + // Should have been called 4 times one for each interval + expect(mockPoller).toHaveBeenCalledTimes(4); + + // Handling one concurrent event + expect(mockPoller).toHaveBeenCalledWith(2); + }); + + test("Should be able to keep the poller with work if tasks take too long", async () => { + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn( + async (work: Task): Promise => { + if (work.id === 0) { + // Add work load on first task + await new Promise((r) => setTimeout(() => r(), 2 * BASE_TIME)); + } + } + ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 2, + pollInterval: BASE_TIME, + } + ); + + poller.startPolling(); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME)); + + // Concurrency is two so should have been called twice + expect(mockPerformWorkFunction).toBeCalledTimes(2); + + // Work for the first task + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task0", + id: 0, + }); + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task1", + id: 1, + }); + + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 3)); poller.stopPolling(); - expect(functionToRun).toHaveBeenCalledTimes(6); + + // Should have been called 4 times one for each interval + expect(mockPoller).toHaveBeenCalledTimes(4); + expect(mockPoller).toHaveBeenCalledWith(2); + expect(mockPoller).toHaveBeenCalledWith(1); // it could not handle two because of the await + expect(mockPoller).toHaveBeenCalledWith(2); }); + test("Should be able to change add concurrency dynamically", async () => { - const functionToRun = jest.fn(); - const poller = new Poller( - async () => { - functionToRun(); - }, - { concurrency: 1, pollInterval: 1000 } + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn(() => + Promise.resolve() ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 2, + pollInterval: BASE_TIME, + } + ); + poller.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 1000)); - poller.updateOptions({ concurrency: 2 }); - await new Promise((r) => setTimeout(() => r(true), 2000)); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME)); + + poller.updateOptions({ concurrency: 3 }); + + // Concurrency is two so should have been called twice + expect(mockPerformWorkFunction).toBeCalledTimes(2); + + // Work for the first task + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task0", + id: 0, + }); + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task1", + id: 1, + }); + + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 3)); poller.stopPolling(); - expect(functionToRun).toHaveBeenCalledTimes(5); + + // Should have been called 4 times one for each interval + expect(mockPoller).toHaveBeenCalledTimes(4); + + expect(mockPoller).toHaveBeenCalledWith(2); + expect(mockPoller).toHaveBeenCalledWith(3); // it could not handle two because of the await + expect(mockPoller).toHaveBeenCalledWith(3); }); + test("Should be able to change remove concurrency dynamically", async () => { - const functionToRun = jest.fn(); - const poller = new Poller( - async () => { - functionToRun(); - }, - { concurrency: 2, pollInterval: 1000 } + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn(() => + Promise.resolve() + ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 2, + pollInterval: BASE_TIME, + } ); + poller.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 1000)); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME)); + poller.updateOptions({ concurrency: 1 }); - await new Promise((r) => setTimeout(() => r(true), 2000)); + + // Concurrency is two so should have been called twice + expect(mockPerformWorkFunction).toBeCalledTimes(2); + + // Work for the first task + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task0", + id: 0, + }); + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task1", + id: 1, + }); + + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 3)); poller.stopPolling(); - expect(functionToRun).toHaveBeenCalledTimes(4); + + // Should have been called 4 times one for each interval + expect(mockPoller).toHaveBeenCalledTimes(4); + + expect(mockPoller).toHaveBeenCalledWith(2); + expect(mockPoller).toHaveBeenCalledWith(1); // it could not handle two because of the await + expect(mockPoller).toHaveBeenCalledWith(1); }); - + test("Should be able to change the pollInterval", async () => { - const functionToRun = jest.fn(); - const poller = new Poller( - async () => { - functionToRun(); + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn(() => + Promise.resolve() + ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 2, + pollInterval: BASE_TIME, + } + ); + + poller.startPolling(); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME)); + + poller.updateOptions({ pollInterval: BASE_TIME * 2 }); + + // Concurrency is two so should have been called twice + expect(mockPerformWorkFunction).toBeCalledTimes(2); + + // Work for the first task + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task0", + id: 0, + }); + expect(mockPerformWorkFunction).toHaveBeenCalledWith({ + description: "task1", + id: 1, + }); + + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 3)); + poller.stopPolling(); + + // Should have been called 3 times since interval was updated + expect(mockPoller).toHaveBeenCalledTimes(3); + }); + + test("Should get warning if unable to poll tasks after threshold", async () => { + const mockPoller: (count: number) => Promise = + jest.fn(fakeTaskGenerator); + + const mockPerformWorkFunction: (work: Task) => Promise = jest.fn( + async (work: Task): Promise => { + if (work.id === 0) { + // Add work load on first task takes two times the poll interval + await new Promise((r) => setTimeout(() => r(), BASE_TIME)); + } + } + ); + + const poller = new Poller( + TEST_POLLER_ID, + mockPoller, + mockPerformWorkFunction, + { + concurrency: 1, + pollInterval: BASE_TIME / 2, //pollInterval takes half. so there will be no work for one cycle + warnAtO: 0, // Setting this to zero so that we get the warning with the first non processed task }, - { concurrency: 1, pollInterval: 1000 } + mockLogger ); + poller.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 1000)); - poller.updateOptions({ pollInterval: 500 }); - await new Promise((r) => setTimeout(() => r(true), 2000)); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 3)); poller.stopPolling(); - expect(functionToRun).toHaveBeenCalledTimes(5); + expect(mockLogger.info).toHaveBeenCalledWith( + `Not polling anything because in process tasks is maxed as concurrency level. ${TEST_POLLER_ID}` + ); }); }); diff --git a/src/task/__tests__/TaskManager.test.ts b/src/task/__tests__/TaskManager.test.ts index 4cb6968..c4f0a6f 100644 --- a/src/task/__tests__/TaskManager.test.ts +++ b/src/task/__tests__/TaskManager.test.ts @@ -2,6 +2,7 @@ import { expect, describe, test, jest } from "@jest/globals"; import { simpleTask, WorkflowExecutor } from "../../core"; import { OrkesApiConfig, orkesConductorClient } from "../../orkes"; import { TaskManager, ConductorWorker } from "../index"; +import { mockLogger } from "./mockLogger"; const playConfig: Partial = { keyId: `${process.env.KEY_ID}`, @@ -9,7 +10,7 @@ const playConfig: Partial = { serverUrl: `${process.env.SERVER_URL}`, refreshTokenInterval: 0, }; - +const BASE_TIME = 500; describe("TaskManager", () => { const clientPromise = orkesConductorClient(playConfig); @@ -31,7 +32,7 @@ describe("TaskManager", () => { }; const manager = new TaskManager(client, [worker], { - options: { pollInterval: 1500 }, + options: { pollInterval: BASE_TIME }, }); manager.startPolling(); @@ -40,7 +41,7 @@ describe("TaskManager", () => { input: {}, version: 1, }); - await new Promise((r) => setTimeout(() => r(true), 3500)); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 3)); const workflowStatus = await client.workflowResource.getExecutionStatus( executionId, true @@ -64,7 +65,7 @@ describe("TaskManager", () => { const manager = new TaskManager(client, [worker], { onError: errorHandler, - options: { pollInterval: 1500 }, + options: { pollInterval: BASE_TIME }, }); manager.startPolling(); @@ -74,7 +75,7 @@ describe("TaskManager", () => { input: {}, version: 1, }); - await new Promise((r) => setTimeout(() => r(true), 4500)); + await new Promise((r) => setTimeout(() => r(true), BASE_TIME * 4)); expect(errorHandler).toBeCalledTimes(1); await manager.stopPolling(); }); @@ -91,7 +92,7 @@ describe("TaskManager", () => { }; const manager = new TaskManager(client, [worker], { - options: { pollInterval: 1500 }, + options: { pollInterval: BASE_TIME }, }); manager.startPolling(); @@ -134,7 +135,7 @@ describe("TaskManager", () => { //create the manager with initial configuations const manager = new TaskManager(client, workers, { - options: { pollInterval: 1500, concurrency: 2 }, + options: { pollInterval: BASE_TIME, concurrency: 2 }, // logger: console, }); // start polling @@ -171,7 +172,135 @@ describe("TaskManager", () => { expect(executionId).toBeDefined(); // decrease speed again - manager.updatePollingOptions({ pollInterval: 1000, concurrency: 1 }); + manager.updatePollingOptions({ pollInterval: BASE_TIME, concurrency: 1 }); + + const workflowStatus = await executor.getWorkflow(executionId!, true); + + expect(workflowStatus.status).toEqual("COMPLETED"); + await manager.stopPolling(); + + expect(manager.isPolling).toBeFalsy(); + expect(manager.options.concurrency).toBe(1); + expect(manager.options.pollInterval).toBe(BASE_TIME); + }); + + test("Should not be able to startPolling if TaskManager has no workers", async () => { + const client = await clientPromise; + const manager = new TaskManager(client, [], { + options: { pollInterval: BASE_TIME, concurrency: 2 }, + }); + expect(() => manager.startPolling()).toThrowError( + "No workers supplied to TaskManager" + ); + }); + + test("Should not be able to startPolling if duplicate workers", async () => { + const client = await clientPromise; + + const workerNames: string[] = Array.from({ length: 3 }) + .fill(0) + .map(() => `worker-name`); + + // names to actual workers + const workers: ConductorWorker[] = workerNames.map((name) => ({ + taskDefName: name, + execute: async () => { + return { + outputData: { + hello: "From your worker", + }, + status: "COMPLETED", + }; + }, + })); + + const manager = new TaskManager(client, workers, { + options: { pollInterval: BASE_TIME, concurrency: 2 }, + }); + expect(() => manager.startPolling()).toThrowError( + "Duplicate worker taskDefName: worker-name" + ); + }); + + test("Updates single worker properties", async () => { + const client = await clientPromise; + + const executor = new WorkflowExecutor(client); + // just create a bunch of worker names + const workerNames: string[] = Array.from({ length: 3 }) + .fill(0) + .map((_, i: number) => `taskman-single-worker-update${1 + i}`); + + const candidateWorkerUpdate = "taskman-single-worker-update1"; + const initialCandidateWorkflowOptions = { + concurrency: 1, + pollInterval: BASE_TIME * 3, + }; + + // names to actual workers + const workers: ConductorWorker[] = workerNames.map((name) => ({ + taskDefName: name, + execute: async () => { + return { + outputData: { + hello: "From your worker", + }, + status: "COMPLETED", + }; + }, + ...(name === candidateWorkerUpdate + ? initialCandidateWorkflowOptions + : {}), + })); + + //create the manager with initial configuations + const manager = new TaskManager(client, workers, { + options: { pollInterval: BASE_TIME, concurrency: 2 }, + logger: mockLogger, + }); + // start polling + manager.startPolling(); + + expect(manager.isPolling).toBeTruthy(); + + const workflowName = "TaskManagerTestMultiSingleWorkerUpdate"; + + const updatedWorkerOptions = { + concurrency: 3, + pollInterval: BASE_TIME, + }; + + // change the polling options for a single worker + manager.updatePollingOptionForWorker( + candidateWorkerUpdate, + updatedWorkerOptions + ); + + // create the workflow where we will run the test + await executor.registerWorkflow(true, { + name: workflowName, + version: 1, + ownerEmail: "developers@orkes.io", + tasks: workerNames.map((name) => simpleTask(name, name, {})), + inputParameters: [], + outputParameters: {}, + timeoutSeconds: 0, + }); + + //Start workf + const { workflowId: executionId } = await executor.executeWorkflow( + { + name: workflowName, + version: 1, + }, + workflowName, + 1, + "identifierTaskManMulti" + ); + expect(executionId).toBeDefined(); + + // decrease speed again + manager.updatePollingOptions({ pollInterval: BASE_TIME, concurrency: 1 }); const workflowStatus = await executor.getWorkflow(executionId!, true); @@ -180,6 +309,13 @@ describe("TaskManager", () => { expect(manager.isPolling).toBeFalsy(); expect(manager.options.concurrency).toBe(1); - expect(manager.options.pollInterval).toBe(1000); + expect(manager.options.pollInterval).toBe(BASE_TIME); + expect(mockLogger.info).toBeCalledWith( + `TaskWorker ${candidateWorkerUpdate} initialized with concurrency of ${initialCandidateWorkflowOptions.concurrency} and poll interval of ${initialCandidateWorkflowOptions.pollInterval}` + ); + + expect(mockLogger.info).toBeCalledWith( + `TaskWorker ${candidateWorkerUpdate} configuration updated with concurrency of ${updatedWorkerOptions.concurrency} and poll interval of ${updatedWorkerOptions.pollInterval}` + ); }); }); diff --git a/src/task/__tests__/TaskRunner.test.ts b/src/task/__tests__/TaskRunner.test.ts index 0b7b753..13920c6 100644 --- a/src/task/__tests__/TaskRunner.test.ts +++ b/src/task/__tests__/TaskRunner.test.ts @@ -7,9 +7,9 @@ import { TaskResourceService } from "../../common/open-api"; test("polls tasks", async () => { const taskClientStub: Mocked< - Pick + Pick > = { - poll: jest.fn(), + batchPoll: jest.fn(), updateTask1: jest.fn(), }; const mockTaskClient = taskClientStub as unknown as TaskResourceService; @@ -38,15 +38,17 @@ test("polls tasks", async () => { }; const workflowInstanceId = "fake-workflow-id"; const taskId = "fake-task-id"; - taskClientStub.poll.mockResolvedValue({ - taskId, - workflowInstanceId, - status: "IN_PROGRESS", - reasonForIncompletion: undefined, - inputData: { - input: "from workflow", + taskClientStub.batchPoll.mockResolvedValue([ + { + taskId, + workflowInstanceId, + status: "IN_PROGRESS", + reasonForIncompletion: undefined, + inputData: { + input: "from workflow", + }, }, - }); + ]); const runner = new TaskRunner(args); runner.startPolling(); @@ -66,9 +68,9 @@ test("polls tasks", async () => { test("Should set the task as failed if the task has an error", async () => { const taskClientStub: Mocked< - Pick + Pick > = { - poll: jest.fn(), + batchPoll: jest.fn(), updateTask1: jest.fn(), }; const mockTaskClient = taskClientStub as unknown as TaskResourceService; @@ -92,15 +94,17 @@ test("Should set the task as failed if the task has an error", async () => { }; const workflowInstanceId = "fake-workflow-id"; const taskId = "fake-task-id"; - taskClientStub.poll.mockResolvedValue({ - taskId, - workflowInstanceId, - status: "IN_PROGRESS", - reasonForIncompletion: undefined, - inputData: { - input: "from workflow", + taskClientStub.batchPoll.mockResolvedValue([ + { + taskId, + workflowInstanceId, + status: "IN_PROGRESS", + reasonForIncompletion: undefined, + inputData: { + input: "from workflow", + }, }, - }); + ]); const runner = new TaskRunner(args); runner.startPolling(); @@ -110,8 +114,7 @@ test("Should set the task as failed if the task has an error", async () => { taskId, workflowInstanceId, status: "FAILED", - outputData: { - }, + outputData: {}, reasonForIncompletion: "Error from worker", }; expect(taskClientStub.updateTask1).toHaveBeenCalledWith(expected); diff --git a/src/task/constants.ts b/src/task/constants.ts new file mode 100644 index 0000000..b1d6de2 --- /dev/null +++ b/src/task/constants.ts @@ -0,0 +1,4 @@ +export const DEFAULT_POLL_INTERVAL = 100; +export const DEFAULT_CONCURRENCY = 1; +export const DEFAULT_WARN_AT_O = 100; +export const DEFAULT_BATCH_POLLING_TIMEOUT = 100; \ No newline at end of file diff --git a/workers_sdk.md b/workers_sdk.md index c797a7c..f37be0e 100644 --- a/workers_sdk.md +++ b/workers_sdk.md @@ -69,6 +69,8 @@ const worker: ConductorWorker = { callbackAfterSeconds: 60, }; }, + pollInterval: 100, // optional + concurrency: 2, // optional }; ``` @@ -91,28 +93,41 @@ const clientPromise = orkesConductorClient({ const client = await clientPromise; -const taskRunner = new TaskRunner({ - taskResource: client.taskResource, - worker: { - taskDefName: "MyCustomWorker", - execute: async ({ inputData, taskId }) => { - return { - outputData: { - greeting: "Hello World", - }, - status: "COMPLETED", - }; - }, - }, - options: { - pollInterval: 10, - domain: undefined, - concurrency: 1, - workerID: "", +const taskDefName = "HelloWorldWorker"; + +const customWorker: ConductorWorker = { +taskDefName, + execute: async ({ inputData, taskId }) => { + return { + outputData: { + greeting: "Hello World", + }, + status: "COMPLETED", + }; }, +}; +// Worker Options will take precedence over options defined in the manager + +const manager = new TaskManager(client, [customWorker], { + options: { pollInterval: 100, concurrency: 1 }, }); -taskRunner.startPolling(); +manager.startPolling(); +// You can update all worker settings at once using +manager.updatePollingOptions({ pollInterval: 100, concurrency: 1 }); + +// You can update a single worker setting using : +manager.updatePollingOptionForWorker(taskDefName, { + pollInterval: 100, + concurrency: 1, +}); + +manager.isPolling // Will resolve to true + +await manager.stopPolling(); + +manager.isPolling // Will resolve to false + ``` ## Task Management APIs @@ -153,12 +168,7 @@ executor.updateTaskByRefName( #### Update task by id ```typescript -await executor.updateTask( - taskId, - executionId, - "COMPLETED", - newChange -); +await executor.updateTask(taskId, executionId, "COMPLETED", newChange); ``` ### Next: [Create and Execute Workflows](workflow_sdk.md)