From f2a884ceef2e338cb6f34b9522bf01e56819e906 Mon Sep 17 00:00:00 2001 From: William Stein Date: Sat, 18 Nov 2023 14:54:52 +0000 Subject: [PATCH] rewrite the read tracking functionality; completely different api and much cleaner simpler code and explanation of how it works --- websocketfs/lib/mount.ts | 12 +++--- websocketfs/lib/read-tracking.ts | 46 ++++++++++++++++++++ websocketfs/lib/sftp-fuse.ts | 72 +++++--------------------------- websocketfs/package.json | 2 +- 4 files changed, 65 insertions(+), 67 deletions(-) create mode 100644 websocketfs/lib/read-tracking.ts diff --git a/websocketfs/lib/mount.ts b/websocketfs/lib/mount.ts index 6a38672..1d67a69 100644 --- a/websocketfs/lib/mount.ts +++ b/websocketfs/lib/mount.ts @@ -20,9 +20,11 @@ interface Options { cacheLinkTimeout?: number; // Read Tracking - // write out to path all files explicitly read in the last timeout seconds. - // path is updated once every update seconds. - readTracking?: { path: string; timeout?: number; update?: number }; + // write out filenames of files that were explicitly read to this file + // separated by nulls (and it is null terminated). A file is added no more + // than once. Delete the file to reset things. The leading slash is + // removed from the filenames, so they are relative to the mount point. + readTrackingFile?: string; // Metadata // If the metadataFile file path is given, we poll it for modification every few seconds. @@ -65,7 +67,7 @@ export default async function mount( cacheStatTimeout, cacheDirTimeout, cacheLinkTimeout, - readTracking, + readTrackingFile, metadataFile, hidePath, } = opts; @@ -76,7 +78,7 @@ export default async function mount( cacheStatTimeout, cacheDirTimeout, cacheLinkTimeout, - readTracking, + readTrackingFile, metadataFile, hidePath, }); diff --git a/websocketfs/lib/read-tracking.ts b/websocketfs/lib/read-tracking.ts new file mode 100644 index 0000000..2492dd2 --- /dev/null +++ b/websocketfs/lib/read-tracking.ts @@ -0,0 +1,46 @@ +/* +Each time a file is explicitly read from that we haven't +seen before, we write its name to readTrackingFile +followed by a null byte. + +Everything is reset when readTrackingFile gets deleted from +disk by some external process (do that to indicate need for +a reset). +*/ + +import { appendFile, writeFile, stat } from "fs/promises"; +import debug from "debug"; +const log = debug("websocketfs:read-tracking"); + +export default class ReadTracking { + private readTrackingFile: string; + private history = new Set(); + + constructor(readTrackingFile) { + this.readTrackingFile = readTrackingFile; + this.init(); + } + + private init = async () => { + try { + await writeFile(this.readTrackingFile, ""); + } catch (err) { + log("error clearing read tracking file", this.readTrackingFile, err); + } + }; + + trackRead = async (filename: string) => { + log(`fileWasRead`, { filename }); + try { + await stat(this.readTrackingFile); + } catch (_) { + // file doesn't exist, so reset history + this.history.clear(); + } + if (this.history.has(filename)) { + return; + } + await appendFile(this.readTrackingFile, `${filename.slice(1)}\0`); + this.history.add(filename); + }; +} diff --git a/websocketfs/lib/sftp-fuse.ts b/websocketfs/lib/sftp-fuse.ts index 2163697..332ee7a 100644 --- a/websocketfs/lib/sftp-fuse.ts +++ b/websocketfs/lib/sftp-fuse.ts @@ -20,8 +20,8 @@ import Fuse from "@cocalc/fuse-native"; import debug from "debug"; import TTLCache from "@isaacs/ttlcache"; import { dirname, join } from "path"; -import { open } from "fs/promises"; import { MetadataFile } from "./metadata-file"; +import ReadTracking from "./read-tracking"; export type { IClientOptions }; @@ -43,13 +43,7 @@ interface Options { cacheStatTimeout?: number; // in seconds (to match sshfs) cacheDirTimeout?: number; cacheLinkTimeout?: number; - readTracking?: { - path: string; - // these are in SECONDS (not ms)! - timeout?: number; // clear entries read this long ago - update?: number; // update the track every this many seconds - modified?: number; // ignore files that were *modified* this recently - }; + readTrackingFile?: string; metadataFile?: string; // reconnect -- defaults to true; if true, automatically reconnects // to server when connection breaks. @@ -67,13 +61,11 @@ export default class SftpFuse { private attrCache: TTLCache | null = null; private dirCache: TTLCache | null = null; private linkCache: TTLCache | null = null; - private readTracking: TTLCache | null = null; - private readTrackingInterval: ReturnType | null = null; - private readTrackingModified: number = 0; private connectOptions?: IClientOptions; private reconnect: boolean; private hidePath?: string; private meta?: MetadataFile; + private readTracking?: ReadTracking; constructor(remote: string, options: Options = {}) { this.remote = remote; @@ -83,7 +75,7 @@ export default class SftpFuse { cacheDirTimeout, cacheLinkTimeout, reconnect = true, - readTracking, + readTrackingFile, metadataFile, hidePath, } = options; @@ -118,8 +110,8 @@ export default class SftpFuse { ttl: (cacheLinkTimeout ?? cacheTimeout) * 1000, }); } - if (readTracking) { - this.initReadTracking(readTracking); + if (readTrackingFile) { + this.readTracking = new ReadTracking(readTrackingFile); } if (metadataFile) { if (this.attrCache != null && this.dirCache != null && cacheTimeout) { @@ -134,30 +126,6 @@ export default class SftpFuse { bindMethods(this); } - private initReadTracking = ({ - path, - timeout = 15, - update = 5, - modified = 0, - }) => { - if (timeout < 1) { - throw Error("readTracking timeoutMs must be at least 1 second"); - } - log("enabling read tracking"); - const ttl = timeout * 1000; - this.readTracking = new TTLCache({ ttl }); - this.readTrackingModified = modified; - this.readTrackingInterval = setInterval(async () => { - if (this.readTracking == null) return; - log("writing out read tracking"); - const out = await open(path, "w"); - for (const x of this.readTracking.keys()) { - await out.write(x + "\n"); - } - await out.close(); - }, update * 1000); - }; - async handleConnectionClose(err) { log("connection closed", err); // @ts-ignore @@ -217,9 +185,6 @@ export default class SftpFuse { this.sftp?.end(); // @ts-ignore delete this.sftp; - if (this.readTrackingInterval) { - clearInterval(this.readTrackingInterval); - } this.meta?.close(); this.state = "closed"; } @@ -513,25 +478,11 @@ export default class SftpFuse { // listxattr(path, cb) // removexattr(path, name, cb) - private trackRead = (path: string) => { - if (this.readTracking == null || this.attrCache == null) { - return; - } - if (!this.readTrackingModified) { - // always track -- don't worry about mtime - this.readTracking.set(path, true); - } - - // only track if at least this.readTrackingModified seconds old. - const x = this.attrCache.get(path)?.attr; - if (x == null) { - return; - } - if (Date.now() - x.mtime.valueOf() <= 1000 * this.readTrackingModified) { - // ignore this -- it was changed too recently - this.readTracking.delete(path); - } else { - this.readTracking.set(path, true); + private trackRead = async (filename: string) => { + try { + await this.readTracking?.trackRead(filename); + } catch (err) { + log("trackRead error -- ", filename, err); } }; @@ -631,7 +582,6 @@ export default class SftpFuse { //log("write", { path, fd, buffer: buffer.toString(), length, position }); if (this.isNotReady(cb)) return; log("write", { path, fd, length, position }); - this.readTracking?.delete(path); this.clearCache(path); if (this.data[fd] == null) { this.data[fd] = [ diff --git a/websocketfs/package.json b/websocketfs/package.json index cf9792e..b145546 100644 --- a/websocketfs/package.json +++ b/websocketfs/package.json @@ -1,6 +1,6 @@ { "name": "websocketfs", - "version": "0.15.1", + "version": "0.16.0", "description": "Like sshfs, but over a WebSocket", "main": "./dist/lib/index.js", "scripts": {