From 97b886abf0ba2ab74deb9a0dff0beca3db00b3a9 Mon Sep 17 00:00:00 2001 From: Anirban Kar Date: Thu, 12 Dec 2024 03:13:52 +0530 Subject: [PATCH] fix: added more controlled rate for code streaming --- app/lib/stores/workbench.ts | 17 ++++++++++--- app/utils/sampler.ts | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 app/utils/sampler.ts diff --git a/app/lib/stores/workbench.ts b/app/lib/stores/workbench.ts index 15482c2a8..0d46057db 100644 --- a/app/lib/stores/workbench.ts +++ b/app/lib/stores/workbench.ts @@ -16,6 +16,7 @@ import * as nodePath from 'node:path'; import { extractRelativePath } from '~/utils/diff'; import { description } from '~/lib/persistence'; import Cookies from 'js-cookie'; +import { createSampler } from '~/utils/sampler'; export interface ArtifactState { id: string; @@ -262,9 +263,9 @@ export class WorkbenchStore { this.artifacts.setKey(messageId, { ...artifact, ...state }); } addAction(data: ActionCallbackData) { - this._addAction(data); + // this._addAction(data); - // this.addToExecutionQueue(()=>this._addAction(data)) + this.addToExecutionQueue(() => this._addAction(data)); } async _addAction(data: ActionCallbackData) { const { messageId } = data; @@ -280,7 +281,7 @@ export class WorkbenchStore { runAction(data: ActionCallbackData, isStreaming: boolean = false) { if (isStreaming) { - this._runAction(data, isStreaming); + this.actionStreamSampler(data, isStreaming); } else { this.addToExecutionQueue(() => this._runAction(data, isStreaming)); } @@ -294,6 +295,12 @@ export class WorkbenchStore { unreachable('Artifact not found'); } + const action = artifact.runner.actions.get()[data.actionId]; + + if (!action || action.executed) { + return; + } + if (data.action.type === 'file') { const wc = await webcontainer; const fullPath = nodePath.join(wc.workdir, data.action.filePath); @@ -323,6 +330,10 @@ export class WorkbenchStore { } } + actionStreamSampler = createSampler(async (data: ActionCallbackData, isStreaming: boolean = false) => { + return await this._runAction(data, isStreaming); + }, 100); // TODO: remove this magic number to have it configurable + #getArtifact(id: string) { const artifacts = this.artifacts.get(); return artifacts[id]; diff --git a/app/utils/sampler.ts b/app/utils/sampler.ts new file mode 100644 index 000000000..963990904 --- /dev/null +++ b/app/utils/sampler.ts @@ -0,0 +1,49 @@ +/** + * Creates a function that samples calls at regular intervals and captures trailing calls. + * - Drops calls that occur between sampling intervals + * - Takes one call per sampling interval if available + * - Captures the last call if no call was made during the interval + * + * @param fn The function to sample + * @param sampleInterval How often to sample calls (in ms) + * @returns The sampled function + */ +export function createSampler any>(fn: T, sampleInterval: number): T { + let lastArgs: Parameters | null = null; + let lastTime = 0; + let timeout: NodeJS.Timeout | null = null; + + // Create a function with the same type as the input function + const sampled = function (this: any, ...args: Parameters) { + const now = Date.now(); + lastArgs = args; + + // If we're within the sample interval, just store the args + if (now - lastTime < sampleInterval) { + // Set up trailing call if not already set + if (!timeout) { + timeout = setTimeout( + () => { + timeout = null; + lastTime = Date.now(); + + if (lastArgs) { + fn.apply(this, lastArgs); + lastArgs = null; + } + }, + sampleInterval - (now - lastTime), + ); + } + + return; + } + + // If we're outside the interval, execute immediately + lastTime = now; + fn.apply(this, args); + lastArgs = null; + } as T; + + return sampled; +}