From cdd3f59fe852cedb6b5d58b92788410e1042b689 Mon Sep 17 00:00:00 2001 From: pajama-coder Date: Wed, 16 Oct 2024 17:49:28 +0800 Subject: [PATCH] Copy files between endpoints with the File app --- agent/apps/ztm/file/api.js | 165 ++++++++++++++++++++++++++++++++++++ agent/apps/ztm/file/main.js | 70 +++++++++++++++ agent/mesh.js | 8 +- 3 files changed, 239 insertions(+), 4 deletions(-) diff --git a/agent/apps/ztm/file/api.js b/agent/apps/ztm/file/api.js index 2dfee47f..427e8a3e 100644 --- a/agent/apps/ztm/file/api.js +++ b/agent/apps/ztm/file/api.js @@ -143,6 +143,167 @@ export default function ({ app, mesh }) { }) ) + var transfers = {} + + function allTransfers(ep, username) { + if (ep === app.endpoint.id) { + return Promise.resolve( + Object.entries(transfers).filter(([k, v]) => canRead(k, username) && v.state === 'working') + ) + } else { + return mesh.request(ep, new Message({ + method: 'GET', + path: '/api/transfers', + })).then(res => res?.head?.status === 200 ? JSON.decode(res.body) : null) + } + } + + function getTransfer(ep, pathname, username) { + if (ep === app.endpoint.id) { + pathname = os.path.normalize(pathname) + if (canRead(pathname, username)) { + return Promise.resolve(transfers[pathname] || null) + } else { + return null + } + } else { + return mesh.request(ep, new Message({ + method: 'GET', + path: os.path.join('/api/transfers', encodePathname(pathname)), + })).then(res => res?.head?.status === 200 ? JSON.decode(res.body) : null) + } + } + + function startTransfer(ep, pathname, source, sourcePath, username) { + if (ep === app.endpoint.id) { + sourcePath = os.path.normalize(sourcePath) + pathname = os.path.normalize(pathname) + if (!canWrite(pathname, username)) return Promise.resolve(false) + if (transfers[pathname]?.state === 'working') return Promise.resolve(false) + var queue = [] + var searches = [sourcePath] + var visits = new Set + var transfer = transfers[pathname] = { + queue, + current: null, + copied: [], + state: 'working', + } + var localBase = pathname + if (os.stat(localBase)?.isDirectory?.()) { + localBase = os.path.join(localBase, os.path.basename(sourcePath)) + } + function findFiles() { + var searchFilename = searches.shift() + if (searchFilename) { + return statFile(source, searchFilename, username).then( + stat => { + if (stat?.dir) { + stat.dir.forEach(name => { + var sourceFilename = os.path.join(searchFilename, name) + if (sourceFilename.endsWith('/')) { + sourceFilename = sourceFilename.substring(0, sourceFilename.length - 1) + if (!visits.has(sourceFilename)) { + visits.add(sourceFilename) + searches.push(sourceFilename) + } + } else { + queue.push(sourceFilename) + } + }) + } else if (stat) { + queue.push(searchFilename) + } + return findFiles() + } + ) + } + } + return findFiles().then(() => { + if (queue.length > 0) { + function transferFile() { + var remoteFilename = queue.shift() + if (remoteFilename) { + transfer.current = remoteFilename + var relativePath = remoteFilename.substring(sourcePath.length) + var localFilename = os.path.join(localBase, relativePath) + app.log(`Downloading ${localFilename} from ep ${source}`) + try { + os.mkdir(os.path.dirname(localFilename), { recursive: true, force: true }) + pipeline($=>$ + .onStart(new Message({ method: 'GET', path: '/' })) + .pipe(downloadFile, () => ({ + endpoint: source, + pathname: remoteFilename, + username, + })) + .pipe( + evt => { + if (evt instanceof MessageStart) { + return evt.head?.status === 200 ? 'ok' : 'error' + } + }, { + 'ok': $=>$.tee(localFilename), + 'error': $=>$.replaceData(), + } + ) + .replaceData() + .replaceMessage(new StreamEnd) + ).spawn().then(() => { + transfer.copied.push(localFilename) + transferFile() + }) + } catch { + transfer.state = 'error' + } + } else { + transfer.current = null + transfer.state = 'done' + } + } + transferFile() + return Promise.resolve(true) + } else { + delete transfers[pathname] + return Promise.resolve(false) + } + }) + } else { + return mesh.request(ep, new Message( + { + method: 'POST', + path: os.path.join('/api/transfers', encodePathname(pathname)) + '?source=' + source, + }, + sourcePath + )).then(res => res?.head?.status === 201) + } + } + + function abortTransfer(ep, pathname, username) { + if (ep === app.endpoint.id) { + pathname = os.path.normalize(pathname) + if (canWrite(pathname, username)) { + var transfer = transfers[pathname] + if (transfer) { + if (transfer.state === 'working') { + transfer.state = 'abort' + } else { + delete transfers[pathname] + } + return Promise.resolve(true) + } + } + return Promise.resolve(false) + } else { + return mesh.request(ep, new Message( + { + method: 'DELETE', + path: os.path.join('/api/transfers', encodePathname(pathname)), + } + )).then(res => res?.head?.status === 204) + } + } + function canRead(pathname, username) { return username === app.username } @@ -158,6 +319,10 @@ export default function ({ app, mesh }) { deleteFile, downloadFile, uploadFile, + allTransfers, + getTransfer, + startTransfer, + abortTransfer, } } diff --git a/agent/apps/ztm/file/main.js b/agent/apps/ztm/file/main.js index 79e40f06..f1b4e01f 100644 --- a/agent/apps/ztm/file/main.js +++ b/agent/apps/ztm/file/main.js @@ -106,6 +106,43 @@ export default function ({ app, mesh, utils }) { }), }, + '/api/endpoints/{ep}/transfers': { + 'GET': responder((params) => { + var ep = params.ep + return api.allTransfers(ep, app.username).then( + ret => response(200, ret) + ) + }) + }, + + '/api/endpoints/{ep}/transfers/*': { + 'GET': responder((params) => { + var ep = params.ep + var pathname = URL.decodeComponent(params['*']) + return api.getTransfer(ep, pathname, app.username).then( + ret => ret ? response(200, ret) : response(404) + ) + }), + + 'POST': responder((params, req) => { + var ep = params.ep + var pathname = URL.decodeComponent(params['*']) + var source = new URL(req.head.path).searchParams.get('source') + var sourcePath = req.body.toString() + return api.startTransfer(ep, pathname, source, sourcePath, app.username).then( + ret => response(ret ? 201 : 404) + ) + }), + + 'DELETE': responder((params) => { + var ep = params.ep + var pathname = URL.decodeComponent(params['*']) + return api.abortTransfer(ep, pathname, app.username).then( + ret => response(ret ? 204 : 404) + ) + }), + }, + '*': { 'GET': responder((_, req) => { return Promise.resolve(gui.serve(req) || response(404)) @@ -179,6 +216,39 @@ export default function ({ app, mesh, utils }) { ) }), }, + + '/api/transfers': { + 'GET': responder(() => { + return api.allTransfers(app.endpoint.id, $ctx.peer.username).then( + ret => response(200, ret) + ) + }) + }, + + '/api/transfers/*': { + 'GET': responder((params) => { + var pathname = URL.decodeComponent(params['*']) + return api.getTransfer(app.endpoint.id, pathname, $ctx.peer.username).then( + ret => ret ? response(200, ret) : response(404) + ) + }), + + 'POST': responder((params, req) => { + var pathname = URL.decodeComponent(params['*']) + var source = new URL(req.head.path).searchParams.get('source') + var sourcePath = req.body.toString() + return api.startTransfer(app.endpoint.id, pathname, source, sourcePath, $ctx.peer.username).then( + ret => response(ret ? 201 : 404) + ) + }), + + 'DELETE': responder((params) => { + var pathname = URL.decodeComponent(params['*']) + return api.abortTransfer(app.endpoint.id, pathname, $ctx.peer.username).then( + ret => response(ret ? 204 : 404) + ) + }), + }, }) return pipeline($=>$ diff --git a/agent/mesh.js b/agent/mesh.js index cd164a8b..777ed29c 100644 --- a/agent/mesh.js +++ b/agent/mesh.js @@ -598,7 +598,7 @@ export default function (rootDir, config) { } return connectApp(params.provider, $requestedApp, username).then(p => { if (p) { - logInfo(`Proxy to local app ${$requestedApp}`) + // logInfo(`Proxy to local app ${$requestedApp}`) $requestedAppPipeline = p return response200 } @@ -612,7 +612,7 @@ export default function (rootDir, config) { } ).to($=>$ .pipe(() => $requestedAppPipeline, () => ({ source: 'peer', peer: $requestedAppPeer })) - .onEnd(() => logInfo(`Proxy to local app ${$requestedApp} ended`)) + // .onEnd(() => logInfo(`Proxy to local app ${$requestedApp} ended`)) ) ) @@ -634,7 +634,7 @@ export default function (rootDir, config) { }) .pipe(() => $selectedHub ? 'proxy' : 'deny', { 'proxy': ($=>$ - .onStart(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub}`)) + // .onStart(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub}`)) .connectHTTPTunnel(() => { var q = `?src=${config.agent.id}` return new Message({ @@ -651,7 +651,7 @@ export default function (rootDir, config) { ) ) ) - .onEnd(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub} ended`)) + // .onEnd(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub} ended`)) ), 'deny': ($=>$ .onStart(() => logError(`No route to endpoint ${ep}`))