From 23f04a29a9114d478a97528288a9d2d114f776dd Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Mon, 22 Jul 2024 19:25:47 -0700 Subject: [PATCH 1/2] feat(client): subscribe to status --- .../app/api/fal/proxy/route.ts | 2 +- libs/client/package.json | 2 +- libs/client/src/function.ts | 257 +++++++++++------- libs/client/src/streaming.ts | 10 + libs/client/src/types.ts | 46 ++-- package-lock.json | 34 +-- package.json | 2 +- 7 files changed, 217 insertions(+), 136 deletions(-) diff --git a/apps/demo-nextjs-app-router/app/api/fal/proxy/route.ts b/apps/demo-nextjs-app-router/app/api/fal/proxy/route.ts index 998ab54..9a14c5c 100644 --- a/apps/demo-nextjs-app-router/app/api/fal/proxy/route.ts +++ b/apps/demo-nextjs-app-router/app/api/fal/proxy/route.ts @@ -1,3 +1,3 @@ import { route } from '@fal-ai/serverless-proxy/nextjs'; -export const { GET, POST } = route; +export const { GET, POST, PUT } = route; diff --git a/libs/client/package.json b/libs/client/package.json index 4601abb..6e979c5 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.14.0-alpha.2", + "version": "0.14.0-alpha.3", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index aa306a0..4d31dfe 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -2,7 +2,12 @@ import { getTemporaryAuthToken } from './auth'; import { dispatchRequest } from './request'; import { storageImpl } from './storage'; import { FalStream } from './streaming'; -import { EnqueueResult, QueueStatus, RequestLog } from './types'; +import { + CompletedQueueStatus, + EnqueueResult, + QueueStatus, + RequestLog, +} from './types'; import { ensureAppIdFormat, isUUIDv4, isValidUrl, parseAppId } from './utils'; /** @@ -110,6 +115,9 @@ export async function send( ); } +export type QueueStatusSubscriptionOptions = QueueStatusOptions & + Omit; + /** * Runs a fal serverless function identified by its `id`. * @@ -127,89 +135,6 @@ type TimeoutId = ReturnType; const DEFAULT_POLL_INTERVAL = 500; -/** - * Subscribes to updates for a specific request in the queue. - * - * @param id - The ID or URL of the function web endpoint. - * @param options - Options to configure how the request is run and how updates are received. - * @returns A promise that resolves to the result of the request once it's completed. - */ -export async function subscribe( - id: string, - options: RunOptions & QueueSubscribeOptions = {} -): Promise { - const { request_id: requestId } = await queue.submit(id, options); - if (options.onEnqueue) { - options.onEnqueue(requestId); - } - const timeout = options.timeout; - let timeoutId: TimeoutId = undefined; - if (timeout) { - timeoutId = setTimeout(() => { - queue.cancel(id, { requestId }).catch(console.warn); - throw new Error( - `Client timed out waiting for the request to complete after ${timeout}ms` - ); - }, timeout); - } - if (options.mode === 'streaming') { - const status = await queue.streamStatus(id, { - requestId, - logs: options.logs, - }); - const logs: RequestLog[] = []; - status.on('message', (data: QueueStatus) => { - if (options.onQueueUpdate) { - // accumulate logs to match previous polling behavior - if ( - 'logs' in data && - Array.isArray(data.logs) && - data.logs.length > 0 - ) { - logs.push(...data.logs); - } - options.onQueueUpdate('logs' in data ? { ...data, logs } : data); - } - }); - await status.done(); - if (timeoutId) { - clearTimeout(timeoutId); - } - return queue.result(id, { requestId }); - } - // default to polling until status streaming is stable and faster - return new Promise((resolve, reject) => { - let timeoutId: ReturnType; - const pollInterval = options.pollInterval ?? DEFAULT_POLL_INTERVAL; - const poll = async () => { - try { - const requestStatus = await queue.status(id, { - requestId, - logs: options.logs ?? false, - }); - if (options.onQueueUpdate) { - options.onQueueUpdate(requestStatus); - } - if (requestStatus.status === 'COMPLETED') { - clearTimeout(timeoutId); - try { - const result = await queue.result(id, { requestId }); - resolve(result); - } catch (error) { - reject(error); - } - return; - } - timeoutId = setTimeout(poll, pollInterval); - } catch (error) { - clearTimeout(timeoutId); - reject(error); - } - }; - poll().catch(reject); - }); -} - /** * Options for subscribing to the request queue. */ @@ -247,6 +172,10 @@ type QueueSubscribeOptions = { * The timeout (in milliseconds) for the request. If the request is not * completed within this time, the subscription will be cancelled. * + * Keep in mind that although the client resolves the function on a timeout, + * and will try to cancel the request on the server, the server might not be + * able to cancel the request if it's already running. + * * Note: currently, the timeout is not enforced and the default is `undefined`. * This behavior might change in the future. */ @@ -326,35 +255,41 @@ interface Queue { status(endpointId: string, options: QueueStatusOptions): Promise; /** - * Retrieves the result of a specific request from the queue. + * Subscribes to updates for a specific request in the queue using HTTP streaming events. * * @param endpointId - The ID of the function web endpoint. - * @param options - Options to configure how the request is run. - * @returns A promise that resolves to the result of the request. + * @param options - Options to configure how the request is run and how updates are received. + * @returns The streaming object that can be used to listen for updates. */ - result( + streamStatus( endpointId: string, - options: BaseQueueOptions - ): Promise; + options: QueueStatusOptions + ): Promise>; /** - * @deprecated Use `fal.subscribe` instead. + * Subscribes to updates for a specific request in the queue using polling or streaming. + * See `options.mode` for more details. + * + * @param endpointId - The ID of the function web endpoint. + * @param options - Options to configure how the request is run and how updates are received. + * @returns A promise that resolves to the final status of the request. */ - subscribe( + subscribeToStatus( endpointId: string, - options: RunOptions & QueueSubscribeOptions - ): Promise; + options: QueueStatusSubscriptionOptions + ): Promise; /** - * Subscribes to updates for a specific request in the queue. + * Retrieves the result of a specific request from the queue. * * @param endpointId - The ID of the function web endpoint. - * @param options - Options to configure how the request is run and how updates are received. + * @param options - Options to configure how the request is run. + * @returns A promise that resolves to the result of the request. */ - streamStatus( + result( endpointId: string, - options: QueueStatusOptions - ): Promise>; + options: BaseQueueOptions + ): Promise; /** * Cancels a request in the queue. @@ -402,6 +337,7 @@ export const queue: Queue = { }, }); }, + async streamStatus( endpointId: string, { requestId, logs = false }: QueueStatusOptions @@ -424,6 +360,108 @@ export const queue: Queue = { method: 'get', }); }, + + async subscribeToStatus(endpointId, options): Promise { + const requestId = options.requestId; + const timeout = options.timeout; + let timeoutId: TimeoutId = undefined; + + const handleCancelError = () => { + // Ignore errors as the client will follow through with the timeout + // regardless of the server response. In case cancelation fails, we + // still want to reject the promise and consider the client call canceled. + }; + if (options.mode === 'streaming') { + const status = await queue.streamStatus(endpointId, { + requestId, + logs: options.logs, + }); + const logs: RequestLog[] = []; + if (timeout) { + timeoutId = setTimeout(() => { + status.abort(); + queue.cancel(endpointId, { requestId }).catch(handleCancelError); + // TODO this error cannot bubble up to the user since it's thrown in + // a closure in the global scope due to setTimeout behavior. + // User will get a platform error instead. We should find a way to + // make this behavior aligned with polling. + throw new Error( + `Client timed out waiting for the request to complete after ${timeout}ms` + ); + }, timeout); + } + status.on('message', (data: QueueStatus) => { + if (options.onQueueUpdate) { + // accumulate logs to match previous polling behavior + if ( + 'logs' in data && + Array.isArray(data.logs) && + data.logs.length > 0 + ) { + logs.push(...data.logs); + } + options.onQueueUpdate('logs' in data ? { ...data, logs } : data); + } + }); + const doneStatus = await status.done(); + if (timeoutId) { + clearTimeout(timeoutId); + } + return doneStatus as CompletedQueueStatus; + } + // default to polling until status streaming is stable and faster + return new Promise((resolve, reject) => { + let pollingTimeoutId: TimeoutId; + // type resolution isn't great in this case, so check for its presence + // and and type so the typechecker behaves as expected + const pollInterval = + 'pollInterval' in options && typeof options.pollInterval === 'number' + ? options.pollInterval ?? DEFAULT_POLL_INTERVAL + : DEFAULT_POLL_INTERVAL; + + const clearScheduledTasks = () => { + if (timeoutId) { + clearTimeout(timeoutId); + } + if (pollingTimeoutId) { + clearTimeout(pollingTimeoutId); + } + }; + if (timeout) { + timeoutId = setTimeout(() => { + clearScheduledTasks(); + queue.cancel(endpointId, { requestId }).catch(handleCancelError); + reject( + new Error( + `Client timed out waiting for the request to complete after ${timeout}ms` + ) + ); + }, timeout); + } + const poll = async () => { + try { + const requestStatus = await queue.status(endpointId, { + requestId, + logs: options.logs ?? false, + }); + if (options.onQueueUpdate) { + options.onQueueUpdate(requestStatus); + } + if (requestStatus.status === 'COMPLETED') { + clearScheduledTasks(); + resolve(requestStatus); + return; + } + pollingTimeoutId = setTimeout(poll, pollInterval); + } catch (error) { + clearScheduledTasks(); + reject(error); + } + }; + poll().catch(reject); + }); + }, + async result( endpointId: string, { requestId }: BaseQueueOptions @@ -436,6 +474,7 @@ export const queue: Queue = { path: `/requests/${requestId}`, }); }, + async cancel( endpointId: string, { requestId }: BaseQueueOptions @@ -448,5 +487,23 @@ export const queue: Queue = { path: `/requests/${requestId}/cancel`, }); }, - subscribe, }; + +/** + * Subscribes to updates for a specific request in the queue. + * + * @param endpointId - The ID of the function web endpoint. + * @param options - Options to configure how the request is run and how updates are received. + * @returns A promise that resolves to the result of the request once it's completed. + */ +export async function subscribe( + endpointId: string, + options: RunOptions & QueueSubscribeOptions = {} +): Promise { + const { request_id: requestId } = await queue.submit(endpointId, options); + if (options.onEnqueue) { + options.onEnqueue(requestId); + } + await queue.subscribeToStatus(endpointId, { requestId, ...options }); + return queue.result(endpointId, { requestId }); +} diff --git a/libs/client/src/streaming.ts b/libs/client/src/streaming.ts index 1ee1df4..cd8d76d 100644 --- a/libs/client/src/streaming.ts +++ b/libs/client/src/streaming.ts @@ -56,6 +56,8 @@ export class FalStream { private streamClosed = false; private donePromise: Promise; + private abortController = new AbortController(); + constructor(url: string, options: StreamOptions) { this.url = url; this.options = options; @@ -93,6 +95,7 @@ export class FalStream { 'content-type': 'application/json', }, body: input && method !== 'get' ? JSON.stringify(input) : undefined, + signal: this.abortController.signal, }); this.handleResponse(response); } catch (error) { @@ -225,6 +228,13 @@ export class FalStream { * @returns the promise that resolves when the request is done. */ public done = async () => this.donePromise; + + /** + * Aborts the streaming request. + */ + public abort = () => { + this.abortController.abort(); + }; } /** diff --git a/libs/client/src/types.ts b/libs/client/src/types.ts index 2b5c8f1..ebade5e 100644 --- a/libs/client/src/types.ts +++ b/libs/client/src/types.ts @@ -17,28 +17,42 @@ export type Metrics = { inference_time: number | null; }; +interface BaseQueueStatus { + status: 'IN_PROGRESS' | 'COMPLETED' | 'IN_QUEUE'; +} + +export interface InProgressQueueStatus extends BaseQueueStatus { + status: 'IN_PROGRESS'; + response_url: string; + logs: RequestLog[]; +} + +export interface CompletedQueueStatus extends BaseQueueStatus { + status: 'COMPLETED'; + response_url: string; + logs: RequestLog[]; + metrics: Metrics; +} + +export interface EnqueuedQueueStatus extends BaseQueueStatus { + status: 'IN_QUEUE'; + queue_position: number; + response_url: string; +} + export type QueueStatus = - | { - status: 'IN_PROGRESS'; - response_url: string; - logs: null | RequestLog[]; - } - | { - status: 'COMPLETED'; - response_url: string; - logs: null | RequestLog[]; - metrics: Metrics; - } - | { - status: 'IN_QUEUE'; - queue_position: number; - response_url: string; - }; + | InProgressQueueStatus + | CompletedQueueStatus + | EnqueuedQueueStatus; export function isQueueStatus(obj: any): obj is QueueStatus { return obj && obj.status && obj.response_url; } +export function isCompletedQueueStatus(obj: any): obj is CompletedQueueStatus { + return isQueueStatus(obj) && obj.status === 'COMPLETED'; +} + export type ValidationErrorInfo = { msg: string; loc: Array; diff --git a/package-lock.json b/package-lock.json index 81f01c7..87ad66c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -105,7 +105,7 @@ "ts-node": "^10.9.1", "ts-protoc-gen": "^0.15.0", "tsconfig-paths": "^4.2.0", - "typescript": "5.1.6" + "typescript": "^5.5.4" } }, "node_modules/@aashutoshrathi/word-wrap": { @@ -2453,19 +2453,6 @@ "node": ">=8" } }, - "node_modules/@commitlint/load/node_modules/typescript": { - "version": "5.2.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.2.2.tgz", - "integrity": "sha512-mI4WrpHsbCIcwT9cF4FZvr80QUeKvsUsUvKDoR+X/7XHQH98xYD8YHZg7ANtz2GtZt/CBq2QJ0thkGJMHfqc1w==", - "dev": true, - "bin": { - "tsc": "bin/tsc", - "tsserver": "bin/tsserver" - }, - "engines": { - "node": ">=14.17" - } - }, "node_modules/@commitlint/message": { "version": "17.8.1", "resolved": "https://registry.npmjs.org/@commitlint/message/-/message-17.8.1.tgz", @@ -5558,6 +5545,19 @@ } } }, + "node_modules/@nx/linter/node_modules/typescript": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.1.6.tgz", + "integrity": "sha512-zaWCozRZ6DLEWAWFrVDz1H6FVXzUSfTy5FUMWsQlU8Ym5JP9eO4xkTIROFCQvhQf61z6O/G6ugw3SgAnvvm+HA==", + "dev": true, + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, "node_modules/@nx/next": { "version": "16.10.0", "resolved": "https://registry.npmjs.org/@nx/next/-/next-16.10.0.tgz", @@ -29617,9 +29617,9 @@ } }, "node_modules/typescript": { - "version": "5.1.6", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.1.6.tgz", - "integrity": "sha512-zaWCozRZ6DLEWAWFrVDz1H6FVXzUSfTy5FUMWsQlU8Ym5JP9eO4xkTIROFCQvhQf61z6O/G6ugw3SgAnvvm+HA==", + "version": "5.5.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.5.4.tgz", + "integrity": "sha512-Mtq29sKDAEYP7aljRgtPOpTvOfbwRWlS6dPRzwjdE+C0R4brX/GUyhHSecbHMFLNBLcJIPt9nl9yG5TZ1weH+Q==", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/package.json b/package.json index bdf6535..72848b9 100644 --- a/package.json +++ b/package.json @@ -121,6 +121,6 @@ "ts-node": "^10.9.1", "ts-protoc-gen": "^0.15.0", "tsconfig-paths": "^4.2.0", - "typescript": "5.1.6" + "typescript": "^5.5.4" } } From 7c315ba7cd46a90ed68f31f4d01ff80c83e10eeb Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Mon, 22 Jul 2024 19:36:58 -0700 Subject: [PATCH 2/2] fix(client): timeout id type --- libs/client/src/function.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index 4d31dfe..be10835 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -131,7 +131,7 @@ export async function run( return send(id, options); } -type TimeoutId = ReturnType; +type TimeoutId = ReturnType | undefined; const DEFAULT_POLL_INTERVAL = 500;