From 436c8d56e99e17f69602d5d8746ed624226c641a Mon Sep 17 00:00:00 2001 From: William Stein Date: Sun, 17 Mar 2024 16:10:45 +0000 Subject: [PATCH] compute servers: implement api for compute similar to the one for sync; add ability to execute code - this was a HUGE gap in functionality - this does involve code duplication, but it is best to write something twice, then refactor, rather than refactor before the code exists and works!! --- src/compute/compute/lib/manager.ts | 82 +++++++++++++++++++ src/packages/comm/websocket/types.ts | 6 ++ src/packages/project/browser-websocket/api.ts | 12 +-- src/packages/sync-client/lib/api.ts | 6 ++ src/packages/sync-fs/lib/handle-api-call.ts | 61 +++++++++++++- 5 files changed, 158 insertions(+), 9 deletions(-) diff --git a/src/compute/compute/lib/manager.ts b/src/compute/compute/lib/manager.ts index 8a71a6064b..ac00347939 100644 --- a/src/compute/compute/lib/manager.ts +++ b/src/compute/compute/lib/manager.ts @@ -27,10 +27,12 @@ import { pingProjectUntilSuccess, waitUntilFilesystemIsOfType } from "./util"; import { apiCall } from "@cocalc/api-client"; import { get_blob_store as initJupyterBlobStore } from "@cocalc/jupyter/blobs"; import { delay } from "awaiting"; +import { executeCode } from "@cocalc/backend/execute-code"; const logger = debug("cocalc:compute:manager"); const STATUS_INTERVAL_MS = 20 * 1000; +const REGISTER_INTERVAL_MS = 30000; interface Options { project_id: string; @@ -167,6 +169,8 @@ class Manager { timeout: Math.ceil(STATUS_INTERVAL_MS / 1000 + 3), }); setInterval(this.reportStatus, STATUS_INTERVAL_MS); + + await this.initApiRequestHandler(); }; private initListings = async () => { @@ -336,4 +340,82 @@ class Manager { private cwd = (path) => { return join(this.home, dirname(path)); }; + + /********************************************************** + * + * project --> compute server api + * + * NOTE: this is very similar to what is in packages/sync-fs/lib/index.ts + * which is a much more complicated version for doing sync. + * There is code duplication, but at least it is good code. I would like + * to refactor these. + * NOTE: there's nothing implemented for closing this properly, which + * doesn't matter since right now the lifetime of this object is the lifetime + * of the process. But for unit testing it would be nice to have a way to close this... + **************************************/ + private registerToHandleApi = async (state = "online") => { + if (state != "online") return; + try { + this.log("registerToHandleApi: registering"); + const api = await this.client.project_client.api(this.project_id); + await api.computeServerComputeRegister(this.compute_server_id); + this.log("registerToHandleApi: registered"); + } catch (err) { + this.log("registerToHandleApi: ERROR -- ", err); + } + }; + + private initApiRequestHandler = async () => { + this.log("initApiRequestHandler: installing API request handler"); + this.websocket.on("data", this.handleApiRequest); + this.log("initSyncRequestHandler: installed handler"); + this.registerToHandleApi(); + //this.registerToHandleApiInterval = + setInterval(this.registerToHandleApi, REGISTER_INTERVAL_MS); + this.websocket.on("state", this.registerToHandleApi); + }; + + private handleApiRequest = async (data) => { + if (!data?.event) { + return; + } + try { + this.log("handleApiRequest:", { data }); + const resp = await this.doApiRequest(data); + this.log("handleApiRequest: ", { resp }); + if (data.id && this.websocket != null) { + this.websocket.write({ + id: data.id, + resp, + }); + } + } catch (err) { + const error = `${err}`; + if (data.id && this.websocket != null) { + this.log("handleApiRequest: returning error", { + event: data?.event, + error, + }); + this.websocket.write({ + id: data.id, + error, + }); + } else { + this.log("handleApiRequest: ignoring error", { + event: data?.event, + error, + }); + } + } + }; + + private doApiRequest = async (data) => { + this.log("doApiRequest", { data }); + switch (data?.event) { + case "exec": + return await executeCode({ ...data.opts, home: this.home }); + default: + throw Error(`unknown event '${data?.event}'`); + } + }; } diff --git a/src/packages/comm/websocket/types.ts b/src/packages/comm/websocket/types.ts index bb9645250f..0da94b305b 100644 --- a/src/packages/comm/websocket/types.ts +++ b/src/packages/comm/websocket/types.ts @@ -203,6 +203,11 @@ interface MesgComputeServerSyncRegister { opts: { compute_server_id: number }; } +interface MesgComputeServerComputeRegister { + cmd: "compute_server_compute_register"; + opts: { compute_server_id: number }; +} + interface MesgComputeServerSyncRequest { cmd: "compute_server_sync_request"; opts: { compute_server_id: number }; @@ -260,6 +265,7 @@ export type Mesg = | MesgComputeFilesystemCache | MesgSyncFS | MesgComputeServerSyncRegister + | MesgComputeServerComputeRegister | MesgComputeServerSyncRequest | MesgCopyFromProjectToComputeServer | MesgCopyFromComputeServerToProject; diff --git a/src/packages/project/browser-websocket/api.ts b/src/packages/project/browser-websocket/api.ts index bf76af41b7..855892e735 100644 --- a/src/packages/project/browser-websocket/api.ts +++ b/src/packages/project/browser-websocket/api.ts @@ -45,6 +45,8 @@ import handleSyncFsApiCall, { handleComputeServerDeleteFiles, handleComputeServerMoveFiles, handleComputeServerRenameFile, + handleComputeServerComputeRegister, + handleComputeServerComputeExec, } from "@cocalc/sync-fs/lib/handle-api-call"; import { version } from "@cocalc/util/smc-version"; import { getLogger } from "@cocalc/project/logger"; @@ -139,11 +141,7 @@ async function handleApiCall(data: Mesg, spark): Promise { if (data.opts.filesystem) { return await handleComputeServerFilesystemExec(data.opts); } else { - throw Error( - `exec on compute server without also setting opts.filesystem=true is not implemented ${JSON.stringify( - data.opts, - )}`, - ); + return await handleComputeServerComputeExec(data.opts); } } else { return await exec(data.opts); @@ -187,7 +185,11 @@ async function handleApiCall(data: Mesg, spark): Promise { case "sync_fs": return await handleSyncFsApiCall(data.opts); case "compute_server_sync_register": + // register filesystem container return await handleComputeServerSyncRegister(data.opts, spark); + case "compute_server_compute_register": + // register compute container + return await handleComputeServerComputeRegister(data.opts, spark); case "compute_server_sync_request": return await handleSyncFsRequestCall(data.opts); case "copy_from_project_to_compute_server": diff --git a/src/packages/sync-client/lib/api.ts b/src/packages/sync-client/lib/api.ts index 26426c9352..d3823e74e3 100644 --- a/src/packages/sync-client/lib/api.ts +++ b/src/packages/sync-client/lib/api.ts @@ -160,4 +160,10 @@ export default class API implements API_Interface { 15000, ); } + async computeServerComputeRegister(compute_server_id) { + return await this.call( + { cmd: "compute_server_compute_register", opts: { compute_server_id } }, + 15000, + ); + } } diff --git a/src/packages/sync-fs/lib/handle-api-call.ts b/src/packages/sync-fs/lib/handle-api-call.ts index 7cee990a5c..2af426c8e8 100644 --- a/src/packages/sync-fs/lib/handle-api-call.ts +++ b/src/packages/sync-fs/lib/handle-api-call.ts @@ -1,4 +1,9 @@ -/* This runs in the project and handles api calls. */ +/* This runs in the project and handles api calls from computer servers. + +It mainly handles a persistent connection from the filesystem container, +and supports functions including moving files, syncing, executing code, +etc. +*/ import { fromCompressedJSON } from "./compressed-json"; import getLogger from "@cocalc/backend/logger"; @@ -247,7 +252,7 @@ export async function handleComputeServerSyncRegister( spark_id: spark.id, }); // save the connection so we can send a sync_request message later, and also handle the api - // calls for copying files back and forth: + // calls for copying files back and forth, etc. sparks[compute_server_id] = spark; const remove = () => { if (sparks[compute_server_id]?.id == spark.id) { @@ -288,8 +293,11 @@ function callComputeServerApi( compute_server_id, mesg, timeoutMs = 30000, + compute = false, ): Promise { - const spark = sparks[compute_server_id]; + const spark = compute + ? computeSparks[compute_server_id] + : sparks[compute_server_id]; if (spark == null) { log("callComputeServerApi: no connection"); throw Error( @@ -348,7 +356,7 @@ export async function handleSyncFsGetListing({ export async function handleComputeServerFilesystemExec(opts) { const { compute_server_id } = opts; - log("handleSyncFsGetListing: ", opts); + log("handleComputeServerFilesystemExec: ", opts); const mesg = { event: "exec", opts }; return await callComputeServerApi( compute_server_id, @@ -385,3 +393,48 @@ export async function handleComputeServerMoveFiles({ const mesg = { event: "move_files", paths, dest }; return await callComputeServerApi(compute_server_id, mesg, 60 * 1000); } + +/* +Similar but for compute instead of filesystem: +*/ + +const computeSparks: { [compute_server_id: number]: Spark } = {}; + +export async function handleComputeServerComputeRegister( + { compute_server_id }, + spark, +) { + log("handleComputeServerComputeRegister -- registering ", { + compute_server_id, + spark_id: spark.id, + }); + // save the connection so we can send a sync_request message later, and also handle the api + // calls for copying files back and forth, etc. + computeSparks[compute_server_id] = spark; + const remove = () => { + if (computeSparks[compute_server_id]?.id == spark.id) { + log( + "handleComputeServerComputeRegister: removing compute server connection due to disconnect -- ", + { compute_server_id, spark_id: spark.id }, + ); + // the spark connection currently cached is this + // one, so we remove it. It could be replaced by + // a new one, in which case we better not remove it. + delete computeSparks[compute_server_id]; + } + }; + spark.on("end", remove); + spark.on("close", remove); +} + +export async function handleComputeServerComputeExec(opts) { + const { compute_server_id } = opts; + log("handleComputeServerComputeExec: ", opts); + const mesg = { event: "exec", opts }; + return await callComputeServerApi( + compute_server_id, + mesg, + (opts.timeout ?? 10) * 1000, + true, + ); +}