Skip to content

Commit

Permalink
rewrite the read tracking functionality; completely different api and…
Browse files Browse the repository at this point in the history
… much cleaner simpler code and explanation of how it works
  • Loading branch information
williamstein committed Nov 18, 2023
1 parent 1a0c14a commit f2a884c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 67 deletions.
12 changes: 7 additions & 5 deletions websocketfs/lib/mount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ interface Options {
cacheLinkTimeout?: number;

// Read Tracking
// write out to path all files explicitly read in the last timeout seconds.
// path is updated once every update seconds.
readTracking?: { path: string; timeout?: number; update?: number };
// write out filenames of files that were explicitly read to this file
// separated by nulls (and it is null terminated). A file is added no more
// than once. Delete the file to reset things. The leading slash is
// removed from the filenames, so they are relative to the mount point.
readTrackingFile?: string;

// Metadata
// If the metadataFile file path is given, we poll it for modification every few seconds.
Expand Down Expand Up @@ -65,7 +67,7 @@ export default async function mount(
cacheStatTimeout,
cacheDirTimeout,
cacheLinkTimeout,
readTracking,
readTrackingFile,
metadataFile,
hidePath,
} = opts;
Expand All @@ -76,7 +78,7 @@ export default async function mount(
cacheStatTimeout,
cacheDirTimeout,
cacheLinkTimeout,
readTracking,
readTrackingFile,
metadataFile,
hidePath,
});
Expand Down
46 changes: 46 additions & 0 deletions websocketfs/lib/read-tracking.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Each time a file is explicitly read from that we haven't
seen before, we write its name to readTrackingFile
followed by a null byte.
Everything is reset when readTrackingFile gets deleted from
disk by some external process (do that to indicate need for
a reset).
*/

import { appendFile, writeFile, stat } from "fs/promises";
import debug from "debug";
const log = debug("websocketfs:read-tracking");

export default class ReadTracking {
private readTrackingFile: string;
private history = new Set<string>();

constructor(readTrackingFile) {
this.readTrackingFile = readTrackingFile;
this.init();
}

private init = async () => {
try {
await writeFile(this.readTrackingFile, "");
} catch (err) {
log("error clearing read tracking file", this.readTrackingFile, err);
}
};

trackRead = async (filename: string) => {
log(`fileWasRead`, { filename });
try {
await stat(this.readTrackingFile);
} catch (_) {
// file doesn't exist, so reset history
this.history.clear();
}
if (this.history.has(filename)) {
return;
}
await appendFile(this.readTrackingFile, `${filename.slice(1)}\0`);
this.history.add(filename);
};
}
72 changes: 11 additions & 61 deletions websocketfs/lib/sftp-fuse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import Fuse from "@cocalc/fuse-native";
import debug from "debug";
import TTLCache from "@isaacs/ttlcache";
import { dirname, join } from "path";
import { open } from "fs/promises";
import { MetadataFile } from "./metadata-file";
import ReadTracking from "./read-tracking";

export type { IClientOptions };

Expand All @@ -43,13 +43,7 @@ interface Options {
cacheStatTimeout?: number; // in seconds (to match sshfs)
cacheDirTimeout?: number;
cacheLinkTimeout?: number;
readTracking?: {
path: string;
// these are in SECONDS (not ms)!
timeout?: number; // clear entries read this long ago
update?: number; // update the track every this many seconds
modified?: number; // ignore files that were *modified* this recently
};
readTrackingFile?: string;
metadataFile?: string;
// reconnect -- defaults to true; if true, automatically reconnects
// to server when connection breaks.
Expand All @@ -67,13 +61,11 @@ export default class SftpFuse {
private attrCache: TTLCache<string, any> | null = null;
private dirCache: TTLCache<string, string[]> | null = null;
private linkCache: TTLCache<string, string> | null = null;
private readTracking: TTLCache<string, boolean> | null = null;
private readTrackingInterval: ReturnType<typeof setInterval> | null = null;
private readTrackingModified: number = 0;
private connectOptions?: IClientOptions;
private reconnect: boolean;
private hidePath?: string;
private meta?: MetadataFile;
private readTracking?: ReadTracking;

constructor(remote: string, options: Options = {}) {
this.remote = remote;
Expand All @@ -83,7 +75,7 @@ export default class SftpFuse {
cacheDirTimeout,
cacheLinkTimeout,
reconnect = true,
readTracking,
readTrackingFile,
metadataFile,
hidePath,
} = options;
Expand Down Expand Up @@ -118,8 +110,8 @@ export default class SftpFuse {
ttl: (cacheLinkTimeout ?? cacheTimeout) * 1000,
});
}
if (readTracking) {
this.initReadTracking(readTracking);
if (readTrackingFile) {
this.readTracking = new ReadTracking(readTrackingFile);
}
if (metadataFile) {
if (this.attrCache != null && this.dirCache != null && cacheTimeout) {
Expand All @@ -134,30 +126,6 @@ export default class SftpFuse {
bindMethods(this);
}

private initReadTracking = ({
path,
timeout = 15,
update = 5,
modified = 0,
}) => {
if (timeout < 1) {
throw Error("readTracking timeoutMs must be at least 1 second");
}
log("enabling read tracking");
const ttl = timeout * 1000;
this.readTracking = new TTLCache({ ttl });
this.readTrackingModified = modified;
this.readTrackingInterval = setInterval(async () => {
if (this.readTracking == null) return;
log("writing out read tracking");
const out = await open(path, "w");
for (const x of this.readTracking.keys()) {
await out.write(x + "\n");
}
await out.close();
}, update * 1000);
};

async handleConnectionClose(err) {
log("connection closed", err);
// @ts-ignore
Expand Down Expand Up @@ -217,9 +185,6 @@ export default class SftpFuse {
this.sftp?.end();
// @ts-ignore
delete this.sftp;
if (this.readTrackingInterval) {
clearInterval(this.readTrackingInterval);
}
this.meta?.close();
this.state = "closed";
}
Expand Down Expand Up @@ -513,25 +478,11 @@ export default class SftpFuse {
// listxattr(path, cb)
// removexattr(path, name, cb)

private trackRead = (path: string) => {
if (this.readTracking == null || this.attrCache == null) {
return;
}
if (!this.readTrackingModified) {
// always track -- don't worry about mtime
this.readTracking.set(path, true);
}

// only track if at least this.readTrackingModified seconds old.
const x = this.attrCache.get(path)?.attr;
if (x == null) {
return;
}
if (Date.now() - x.mtime.valueOf() <= 1000 * this.readTrackingModified) {
// ignore this -- it was changed too recently
this.readTracking.delete(path);
} else {
this.readTracking.set(path, true);
private trackRead = async (filename: string) => {
try {
await this.readTracking?.trackRead(filename);
} catch (err) {
log("trackRead error -- ", filename, err);
}
};

Expand Down Expand Up @@ -631,7 +582,6 @@ export default class SftpFuse {
//log("write", { path, fd, buffer: buffer.toString(), length, position });
if (this.isNotReady(cb)) return;
log("write", { path, fd, length, position });
this.readTracking?.delete(path);
this.clearCache(path);
if (this.data[fd] == null) {
this.data[fd] = [
Expand Down
2 changes: 1 addition & 1 deletion websocketfs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "websocketfs",
"version": "0.15.1",
"version": "0.16.0",
"description": "Like sshfs, but over a WebSocket",
"main": "./dist/lib/index.js",
"scripts": {
Expand Down

0 comments on commit f2a884c

Please sign in to comment.