-
Notifications
You must be signed in to change notification settings - Fork 2
/
r2file.ts
146 lines (136 loc) · 3.52 KB
/
r2file.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
export class R2File {
constructor(
readonly key: string,
readonly size: number,
readonly parts: Readonly<Part[]>,
readonly metadata: Record<string, unknown>,
private readonly bucket: Pick<R2Bucket, 'get'|'delete'>,
) {}
get body(): ReadableStream {
return this.slice(0, this.size);
}
slice(
offset: number,
length: number,
): ReadableStream {
length = Math.min(this.size - offset, length);
const parts = selectPartRanges(this, offset, length);
const { readable, writable } = new FixedLengthStream(length);
try {
(async () => {
for (const {part, range} of parts) {
if (range?.length === 0) {
continue; // skip placeholder records
}
const record = await this.getPart(part.key, range);
if (!record) {
writable.abort(`Failed to read R2 key: ${part.key}`);
return;
}
await record.body.pipeTo(writable, {preventClose: true});
}
writable.close();
})();
} catch (err) {
writable.abort(err);
}
return readable;
}
async getPart(
which: number | string,
range?: { offset: number; length: number; },
): Promise<R2ObjectBody | null> {
const key = (typeof which === 'number') ? this.parts[which].key : which;
const result = await this.bucket.get(key, { range });
return result && isR2ObjectBody(result) ? result : null;
}
// Delete all related R2Objects
async delete(): Promise<string[]> {
const keys = this.parts.map(({key}) => key);
const {error, deleted} = await deleteR2Records(this.bucket, keys);
if (error) {
throw error;
}
return deleted;
}
}
export interface Part {
key: string;
size: number;
}
export interface RangedPart {
part: Part;
range?: {
offset: number;
length: number;
}
}
export function selectPartRanges(
file: Readonly<{size: number; parts: Readonly<Part[]>}>,
offset: number,
length: number,
): RangedPart[] {
const selected: RangedPart[] = [];
let partOffset = 0; // n bytes before current part
let i = 0;
// Optimized path if slice represents entirety of file
if (file.size === length && offset === 0) {
return file.parts.filter(({size}) => size > 0).map(part => ({part}));
}
const parts = file.parts;
while (i < parts.length) {
const partSize = parts[i].size;
if ((partOffset + partSize) > offset) {
break;
}
partOffset += partSize;
i++;
}
for (;i < parts.length && length > 0; i++) {
const partSize = parts[i].size;
if (partSize === 0) {
continue;
}
const start = Math.max(0, offset - partOffset);
const readable = Math.min(partSize - start, length);
const ranged = !((start === 0) && readable === partSize);
selected.push({
part: parts[i],
...(ranged ? {
range: {
offset: start,
length: readable,
},
} : {}),
});
length -= readable;
partOffset += partSize;
}
return selected;
}
function isR2ObjectBody(
obj: Readonly<R2Object|R2ObjectBody|null>
): obj is R2ObjectBody {
return obj !== null && 'body' in obj;
}
export async function deleteR2Records(
bucket: Pick<R2Bucket, 'delete'>,
keys: string[],
maxKeysPerDelete = 1000,
): Promise<{deleted: string[], error?: Error}> {
let deleted: string[] = [];
for (let i = 0; i < keys.length; i += maxKeysPerDelete) {
const slice = keys.slice(i, i + maxKeysPerDelete);
try {
await bucket.delete(slice);
} catch (err: unknown) {
const error = err instanceof Error ? err: new Error(typeof err === 'string' ? err : 'unknown error deleting R2 keys');
return {
error,
deleted,
};
}
deleted = deleted.concat(slice);
}
return {deleted};
}