Skip to content

Commit

Permalink
Copy files between endpoints with the File app
Browse files Browse the repository at this point in the history
  • Loading branch information
pajama-coder committed Oct 16, 2024
1 parent 73c25d6 commit cdd3f59
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 4 deletions.
165 changes: 165 additions & 0 deletions agent/apps/ztm/file/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,167 @@ export default function ({ app, mesh }) {
})
)

var transfers = {}

function allTransfers(ep, username) {
if (ep === app.endpoint.id) {
return Promise.resolve(
Object.entries(transfers).filter(([k, v]) => canRead(k, username) && v.state === 'working')
)
} else {
return mesh.request(ep, new Message({
method: 'GET',
path: '/api/transfers',
})).then(res => res?.head?.status === 200 ? JSON.decode(res.body) : null)
}
}

function getTransfer(ep, pathname, username) {
if (ep === app.endpoint.id) {
pathname = os.path.normalize(pathname)
if (canRead(pathname, username)) {
return Promise.resolve(transfers[pathname] || null)
} else {
return null
}
} else {
return mesh.request(ep, new Message({
method: 'GET',
path: os.path.join('/api/transfers', encodePathname(pathname)),
})).then(res => res?.head?.status === 200 ? JSON.decode(res.body) : null)
}
}

function startTransfer(ep, pathname, source, sourcePath, username) {
if (ep === app.endpoint.id) {
sourcePath = os.path.normalize(sourcePath)
pathname = os.path.normalize(pathname)
if (!canWrite(pathname, username)) return Promise.resolve(false)
if (transfers[pathname]?.state === 'working') return Promise.resolve(false)
var queue = []
var searches = [sourcePath]
var visits = new Set
var transfer = transfers[pathname] = {
queue,
current: null,
copied: [],
state: 'working',
}
var localBase = pathname
if (os.stat(localBase)?.isDirectory?.()) {
localBase = os.path.join(localBase, os.path.basename(sourcePath))
}
function findFiles() {
var searchFilename = searches.shift()
if (searchFilename) {
return statFile(source, searchFilename, username).then(
stat => {
if (stat?.dir) {
stat.dir.forEach(name => {
var sourceFilename = os.path.join(searchFilename, name)
if (sourceFilename.endsWith('/')) {
sourceFilename = sourceFilename.substring(0, sourceFilename.length - 1)
if (!visits.has(sourceFilename)) {
visits.add(sourceFilename)
searches.push(sourceFilename)
}
} else {
queue.push(sourceFilename)
}
})
} else if (stat) {
queue.push(searchFilename)
}
return findFiles()
}
)
}
}
return findFiles().then(() => {
if (queue.length > 0) {
function transferFile() {
var remoteFilename = queue.shift()
if (remoteFilename) {
transfer.current = remoteFilename
var relativePath = remoteFilename.substring(sourcePath.length)
var localFilename = os.path.join(localBase, relativePath)
app.log(`Downloading ${localFilename} from ep ${source}`)
try {
os.mkdir(os.path.dirname(localFilename), { recursive: true, force: true })
pipeline($=>$
.onStart(new Message({ method: 'GET', path: '/' }))
.pipe(downloadFile, () => ({
endpoint: source,
pathname: remoteFilename,
username,
}))
.pipe(
evt => {
if (evt instanceof MessageStart) {
return evt.head?.status === 200 ? 'ok' : 'error'
}
}, {
'ok': $=>$.tee(localFilename),
'error': $=>$.replaceData(),
}
)
.replaceData()
.replaceMessage(new StreamEnd)
).spawn().then(() => {
transfer.copied.push(localFilename)
transferFile()
})
} catch {
transfer.state = 'error'
}
} else {
transfer.current = null
transfer.state = 'done'
}
}
transferFile()
return Promise.resolve(true)
} else {
delete transfers[pathname]
return Promise.resolve(false)
}
})
} else {
return mesh.request(ep, new Message(
{
method: 'POST',
path: os.path.join('/api/transfers', encodePathname(pathname)) + '?source=' + source,
},
sourcePath
)).then(res => res?.head?.status === 201)
}
}

function abortTransfer(ep, pathname, username) {
if (ep === app.endpoint.id) {
pathname = os.path.normalize(pathname)
if (canWrite(pathname, username)) {
var transfer = transfers[pathname]
if (transfer) {
if (transfer.state === 'working') {
transfer.state = 'abort'
} else {
delete transfers[pathname]
}
return Promise.resolve(true)
}
}
return Promise.resolve(false)
} else {
return mesh.request(ep, new Message(
{
method: 'DELETE',
path: os.path.join('/api/transfers', encodePathname(pathname)),
}
)).then(res => res?.head?.status === 204)
}
}

function canRead(pathname, username) {
return username === app.username
}
Expand All @@ -158,6 +319,10 @@ export default function ({ app, mesh }) {
deleteFile,
downloadFile,
uploadFile,
allTransfers,
getTransfer,
startTransfer,
abortTransfer,
}
}

Expand Down
70 changes: 70 additions & 0 deletions agent/apps/ztm/file/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,43 @@ export default function ({ app, mesh, utils }) {
}),
},

'/api/endpoints/{ep}/transfers': {
'GET': responder((params) => {
var ep = params.ep
return api.allTransfers(ep, app.username).then(
ret => response(200, ret)
)
})
},

'/api/endpoints/{ep}/transfers/*': {
'GET': responder((params) => {
var ep = params.ep
var pathname = URL.decodeComponent(params['*'])
return api.getTransfer(ep, pathname, app.username).then(
ret => ret ? response(200, ret) : response(404)
)
}),

'POST': responder((params, req) => {
var ep = params.ep
var pathname = URL.decodeComponent(params['*'])
var source = new URL(req.head.path).searchParams.get('source')
var sourcePath = req.body.toString()
return api.startTransfer(ep, pathname, source, sourcePath, app.username).then(
ret => response(ret ? 201 : 404)
)
}),

'DELETE': responder((params) => {
var ep = params.ep
var pathname = URL.decodeComponent(params['*'])
return api.abortTransfer(ep, pathname, app.username).then(
ret => response(ret ? 204 : 404)
)
}),
},

'*': {
'GET': responder((_, req) => {
return Promise.resolve(gui.serve(req) || response(404))
Expand Down Expand Up @@ -179,6 +216,39 @@ export default function ({ app, mesh, utils }) {
)
}),
},

'/api/transfers': {
'GET': responder(() => {
return api.allTransfers(app.endpoint.id, $ctx.peer.username).then(
ret => response(200, ret)
)
})
},

'/api/transfers/*': {
'GET': responder((params) => {
var pathname = URL.decodeComponent(params['*'])
return api.getTransfer(app.endpoint.id, pathname, $ctx.peer.username).then(
ret => ret ? response(200, ret) : response(404)
)
}),

'POST': responder((params, req) => {
var pathname = URL.decodeComponent(params['*'])
var source = new URL(req.head.path).searchParams.get('source')
var sourcePath = req.body.toString()
return api.startTransfer(app.endpoint.id, pathname, source, sourcePath, $ctx.peer.username).then(
ret => response(ret ? 201 : 404)
)
}),

'DELETE': responder((params) => {
var pathname = URL.decodeComponent(params['*'])
return api.abortTransfer(app.endpoint.id, pathname, $ctx.peer.username).then(
ret => response(ret ? 204 : 404)
)
}),
},
})

return pipeline($=>$
Expand Down
8 changes: 4 additions & 4 deletions agent/mesh.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ export default function (rootDir, config) {
}
return connectApp(params.provider, $requestedApp, username).then(p => {
if (p) {
logInfo(`Proxy to local app ${$requestedApp}`)
// logInfo(`Proxy to local app ${$requestedApp}`)
$requestedAppPipeline = p
return response200
}
Expand All @@ -612,7 +612,7 @@ export default function (rootDir, config) {
}
).to($=>$
.pipe(() => $requestedAppPipeline, () => ({ source: 'peer', peer: $requestedAppPeer }))
.onEnd(() => logInfo(`Proxy to local app ${$requestedApp} ended`))
// .onEnd(() => logInfo(`Proxy to local app ${$requestedApp} ended`))
)
)

Expand All @@ -634,7 +634,7 @@ export default function (rootDir, config) {
})
.pipe(() => $selectedHub ? 'proxy' : 'deny', {
'proxy': ($=>$
.onStart(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub}`))
// .onStart(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub}`))
.connectHTTPTunnel(() => {
var q = `?src=${config.agent.id}`
return new Message({
Expand All @@ -651,7 +651,7 @@ export default function (rootDir, config) {
)
)
)
.onEnd(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub} ended`))
// .onEnd(() => logInfo(`Proxy to ${app} at endpoint ${ep} via ${$selectedHub} ended`))
),
'deny': ($=>$
.onStart(() => logError(`No route to endpoint ${ep}`))
Expand Down

0 comments on commit cdd3f59

Please sign in to comment.