Skip to content

Commit

Permalink
compute servers: implement api for compute similar to the one for syn…
Browse files Browse the repository at this point in the history
…c; add ability to execute code

- this was a HUGE gap in functionality
- this does involve code duplication, but it is best to write something
  twice, then refactor, rather than refactor before the code exists and
  works!!
  • Loading branch information
williamstein committed Mar 17, 2024
1 parent a23d2f1 commit 436c8d5
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 9 deletions.
82 changes: 82 additions & 0 deletions src/compute/compute/lib/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import { pingProjectUntilSuccess, waitUntilFilesystemIsOfType } from "./util";
import { apiCall } from "@cocalc/api-client";
import { get_blob_store as initJupyterBlobStore } from "@cocalc/jupyter/blobs";
import { delay } from "awaiting";
import { executeCode } from "@cocalc/backend/execute-code";

const logger = debug("cocalc:compute:manager");

const STATUS_INTERVAL_MS = 20 * 1000;
const REGISTER_INTERVAL_MS = 30000;

interface Options {
project_id: string;
Expand Down Expand Up @@ -167,6 +169,8 @@ class Manager {
timeout: Math.ceil(STATUS_INTERVAL_MS / 1000 + 3),
});
setInterval(this.reportStatus, STATUS_INTERVAL_MS);

await this.initApiRequestHandler();
};

private initListings = async () => {
Expand Down Expand Up @@ -336,4 +340,82 @@ class Manager {
private cwd = (path) => {
return join(this.home, dirname(path));
};

/**********************************************************
*
* project --> compute server api
*
* NOTE: this is very similar to what is in packages/sync-fs/lib/index.ts
* which is a much more complicated version for doing sync.
* There is code duplication, but at least it is good code. I would like
* to refactor these.
* NOTE: there's nothing implemented for closing this properly, which
* doesn't matter since right now the lifetime of this object is the lifetime
* of the process. But for unit testing it would be nice to have a way to close this...
**************************************/
private registerToHandleApi = async (state = "online") => {
if (state != "online") return;
try {
this.log("registerToHandleApi: registering");
const api = await this.client.project_client.api(this.project_id);
await api.computeServerComputeRegister(this.compute_server_id);
this.log("registerToHandleApi: registered");
} catch (err) {
this.log("registerToHandleApi: ERROR -- ", err);
}
};

private initApiRequestHandler = async () => {
this.log("initApiRequestHandler: installing API request handler");
this.websocket.on("data", this.handleApiRequest);
this.log("initSyncRequestHandler: installed handler");
this.registerToHandleApi();
//this.registerToHandleApiInterval =
setInterval(this.registerToHandleApi, REGISTER_INTERVAL_MS);
this.websocket.on("state", this.registerToHandleApi);
};

private handleApiRequest = async (data) => {
if (!data?.event) {
return;
}
try {
this.log("handleApiRequest:", { data });
const resp = await this.doApiRequest(data);
this.log("handleApiRequest: ", { resp });
if (data.id && this.websocket != null) {
this.websocket.write({
id: data.id,
resp,
});
}
} catch (err) {
const error = `${err}`;
if (data.id && this.websocket != null) {
this.log("handleApiRequest: returning error", {
event: data?.event,
error,
});
this.websocket.write({
id: data.id,
error,
});
} else {
this.log("handleApiRequest: ignoring error", {
event: data?.event,
error,
});
}
}
};

private doApiRequest = async (data) => {
this.log("doApiRequest", { data });
switch (data?.event) {
case "exec":
return await executeCode({ ...data.opts, home: this.home });
default:
throw Error(`unknown event '${data?.event}'`);
}
};
}
6 changes: 6 additions & 0 deletions src/packages/comm/websocket/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ interface MesgComputeServerSyncRegister {
opts: { compute_server_id: number };
}

interface MesgComputeServerComputeRegister {
cmd: "compute_server_compute_register";
opts: { compute_server_id: number };
}

interface MesgComputeServerSyncRequest {
cmd: "compute_server_sync_request";
opts: { compute_server_id: number };
Expand Down Expand Up @@ -260,6 +265,7 @@ export type Mesg =
| MesgComputeFilesystemCache
| MesgSyncFS
| MesgComputeServerSyncRegister
| MesgComputeServerComputeRegister
| MesgComputeServerSyncRequest
| MesgCopyFromProjectToComputeServer
| MesgCopyFromComputeServerToProject;
12 changes: 7 additions & 5 deletions src/packages/project/browser-websocket/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import handleSyncFsApiCall, {
handleComputeServerDeleteFiles,
handleComputeServerMoveFiles,
handleComputeServerRenameFile,
handleComputeServerComputeRegister,
handleComputeServerComputeExec,
} from "@cocalc/sync-fs/lib/handle-api-call";
import { version } from "@cocalc/util/smc-version";
import { getLogger } from "@cocalc/project/logger";
Expand Down Expand Up @@ -139,11 +141,7 @@ async function handleApiCall(data: Mesg, spark): Promise<any> {
if (data.opts.filesystem) {
return await handleComputeServerFilesystemExec(data.opts);
} else {
throw Error(
`exec on compute server without also setting opts.filesystem=true is not implemented ${JSON.stringify(
data.opts,
)}`,
);
return await handleComputeServerComputeExec(data.opts);
}
} else {
return await exec(data.opts);
Expand Down Expand Up @@ -187,7 +185,11 @@ async function handleApiCall(data: Mesg, spark): Promise<any> {
case "sync_fs":
return await handleSyncFsApiCall(data.opts);
case "compute_server_sync_register":
// register filesystem container
return await handleComputeServerSyncRegister(data.opts, spark);
case "compute_server_compute_register":
// register compute container
return await handleComputeServerComputeRegister(data.opts, spark);
case "compute_server_sync_request":
return await handleSyncFsRequestCall(data.opts);
case "copy_from_project_to_compute_server":
Expand Down
6 changes: 6 additions & 0 deletions src/packages/sync-client/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,10 @@ export default class API implements API_Interface {
15000,
);
}
async computeServerComputeRegister(compute_server_id) {
return await this.call(
{ cmd: "compute_server_compute_register", opts: { compute_server_id } },
15000,
);
}
}
61 changes: 57 additions & 4 deletions src/packages/sync-fs/lib/handle-api-call.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
/* This runs in the project and handles api calls. */
/* This runs in the project and handles api calls from computer servers.
It mainly handles a persistent connection from the filesystem container,
and supports functions including moving files, syncing, executing code,
etc.
*/

import { fromCompressedJSON } from "./compressed-json";
import getLogger from "@cocalc/backend/logger";
Expand Down Expand Up @@ -247,7 +252,7 @@ export async function handleComputeServerSyncRegister(
spark_id: spark.id,
});
// save the connection so we can send a sync_request message later, and also handle the api
// calls for copying files back and forth:
// calls for copying files back and forth, etc.
sparks[compute_server_id] = spark;
const remove = () => {
if (sparks[compute_server_id]?.id == spark.id) {
Expand Down Expand Up @@ -288,8 +293,11 @@ function callComputeServerApi(
compute_server_id,
mesg,
timeoutMs = 30000,
compute = false,
): Promise<any> {
const spark = sparks[compute_server_id];
const spark = compute
? computeSparks[compute_server_id]
: sparks[compute_server_id];
if (spark == null) {
log("callComputeServerApi: no connection");
throw Error(
Expand Down Expand Up @@ -348,7 +356,7 @@ export async function handleSyncFsGetListing({

export async function handleComputeServerFilesystemExec(opts) {
const { compute_server_id } = opts;
log("handleSyncFsGetListing: ", opts);
log("handleComputeServerFilesystemExec: ", opts);
const mesg = { event: "exec", opts };
return await callComputeServerApi(
compute_server_id,
Expand Down Expand Up @@ -385,3 +393,48 @@ export async function handleComputeServerMoveFiles({
const mesg = { event: "move_files", paths, dest };
return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);
}

/*
Similar but for compute instead of filesystem:
*/

const computeSparks: { [compute_server_id: number]: Spark } = {};

export async function handleComputeServerComputeRegister(
{ compute_server_id },
spark,
) {
log("handleComputeServerComputeRegister -- registering ", {
compute_server_id,
spark_id: spark.id,
});
// save the connection so we can send a sync_request message later, and also handle the api
// calls for copying files back and forth, etc.
computeSparks[compute_server_id] = spark;
const remove = () => {
if (computeSparks[compute_server_id]?.id == spark.id) {
log(
"handleComputeServerComputeRegister: removing compute server connection due to disconnect -- ",
{ compute_server_id, spark_id: spark.id },
);
// the spark connection currently cached is this
// one, so we remove it. It could be replaced by
// a new one, in which case we better not remove it.
delete computeSparks[compute_server_id];
}
};
spark.on("end", remove);
spark.on("close", remove);
}

export async function handleComputeServerComputeExec(opts) {
const { compute_server_id } = opts;
log("handleComputeServerComputeExec: ", opts);
const mesg = { event: "exec", opts };
return await callComputeServerApi(
compute_server_id,
mesg,
(opts.timeout ?? 10) * 1000,
true,
);
}

0 comments on commit 436c8d5

Please sign in to comment.