diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 003344bf..3e944d5e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,8 +4,8 @@ on: [push, pull_request] jobs: build: - name: ubuntu-16.04 - runs-on: ubuntu-16.04 + name: ubuntu-latest + runs-on: ubuntu-latest timeout-minutes: 60 strategy: diff --git a/deps.ts b/deps.ts index 0f0e483d..ba083a4b 100644 --- a/deps.ts +++ b/deps.ts @@ -1,6 +1,9 @@ export * as Bson from "./bson/mod.ts"; +export * from "./bson/mod.ts"; +export type { Document } from "./bson/mod.ts"; export { createHash } from "https://deno.land/std@0.105.0/hash/mod.ts"; -export { pbkdf2Sync } from "https://deno.land/std@0.105.0/node/_crypto/pbkdf2.ts"; +// Switch back to std when std@0.107 lands +export { pbkdf2Sync } from "https://raw.githubusercontent.com/denoland/deno_std/b7c61a2/node/_crypto/pbkdf2.ts"; export { HmacSha1 } from "https://deno.land/std@0.105.0/hash/sha1.ts"; export { HmacSha256 } from "https://deno.land/std@0.105.0/hash/sha256.ts"; export * from "https://deno.land/x/bytes_formater@v1.4.0/mod.ts"; diff --git a/mod.ts b/mod.ts index 6c4db608..8593f71b 100644 --- a/mod.ts +++ b/mod.ts @@ -3,3 +3,4 @@ export { Database } from "./src/database.ts"; export { Collection } from "./src/collection/mod.ts"; export * from "./src/types.ts"; export { Bson } from "./deps.ts"; +export { GridFSBucket } from "./src/gridfs/bucket.ts"; diff --git a/src/gridfs/bucket.ts b/src/gridfs/bucket.ts new file mode 100644 index 00000000..75776c0b --- /dev/null +++ b/src/gridfs/bucket.ts @@ -0,0 +1,185 @@ +import { assert, ObjectId } from "../../deps.ts"; +import { Collection } from "../collection/collection.ts"; +import { FindCursor } from "../collection/commands/find.ts"; +import { Database } from "../database.ts"; +import { Filter } from "../types.ts"; +import { + Chunk, + File, + FileId, + GridFSBucketOptions, + GridFSFindOptions, + GridFSUploadOptions, +} from "../types/gridfs.ts"; +import { checkIndexes } from "./indexes.ts"; +import { createUploadStream } from "./upload.ts"; + +export class GridFSBucket { + #chunksCollection: Collection; + #filesCollection: Collection; + #chunkSizeBytes: number; + #checkedIndexes: boolean = false; + + private readonly getBucketData = () => ({ + filesCollection: this.#filesCollection, + chunksCollection: this.#chunksCollection, + chunkSizeBytes: this.#chunkSizeBytes, + }); + + /** + * Create a new GridFSBucket object on @db with the given @options. + */ + constructor( + db: Database, + options: GridFSBucketOptions = {}, + ) { + const newLocal = options.bucketName ?? "fs"; + this.#chunksCollection = db.collection(`${newLocal}.chunks`); + this.#filesCollection = db.collection(`${newLocal}.files`); + this.#chunkSizeBytes = options.chunkSizeBytes ?? 255 * 1024; + } + + /** + * Opens a Stream that the application can write the contents of the file to. + * The driver generates the file id. + * + * Returns a Stream to which the application will write the contents. + * + * Note: this method is provided for backward compatibility. In languages + * that use generic type parameters, this method may be omitted since + * the TFileId type might not be an ObjectId. + */ + openUploadStream( + filename: string, + options?: GridFSUploadOptions, + ) { + return this.openUploadStreamWithId( + new ObjectId(), + filename, + options, + ); + } + + /** + * Opens a Stream that the application can write the contents of the file to. + * The application provides a custom file id. + * + * Returns a Stream to which the application will write the contents. + */ + openUploadStreamWithId( + id: FileId, + filename: string, + options?: GridFSUploadOptions, + ) { + if (!this.#checkedIndexes) this.#checkIndexes(); + return createUploadStream(this.getBucketData(), filename, id, options); + } + + /** + * Uploads a user file to a GridFS bucket. The driver generates the file id. + * + * Reads the contents of the user file from the @source Stream and uploads it + * as chunks in the chunks collection. After all the chunks have been uploaded, + * it creates a files collection document for @filename in the files collection. + * + * Returns the id of the uploaded file. + * + * Note: this method is provided for backward compatibility. In languages + * that use generic type parameters, this method may be omitted since + * the TFileId type might not be an ObjectId. + */ + uploadFromStream( + filename: string, + source: ReadableStream, + options?: GridFSUploadOptions, + ): ObjectId { + const objectid = ObjectId.generate(); + source.pipeTo(this.openUploadStreamWithId(objectid, filename, options)); + return objectid; + } + + /** + * Uploads a user file to a GridFS bucket. The application supplies a custom file id. + * + * Reads the contents of the user file from the @source Stream and uploads it + * as chunks in the chunks collection. After all the chunks have been uploaded, + * it creates a files collection document for @filename in the files collection. + * + * Note: there is no need to return the id of the uploaded file because the application + * already supplied it as a parameter. + */ + uploadFromStreamWithId( + id: FileId, + filename: string, + source: ReadableStream, + options: GridFSUploadOptions, + ): void { + source.pipeTo(this.openUploadStreamWithId(id, filename, options)); + } + + /** Opens a Stream from which the application can read the contents of the stored file + * specified by @id. + * + * Returns a Stream. + */ + openDownloadStream(id: FileId) { + if (!this.#checkedIndexes) this.#checkIndexes(); + + return new ReadableStream({ + start: async (controller) => { + const collection = this.#chunksCollection.find({ files_id: id }); + await collection.forEach((value) => + controller.enqueue(value?.data.buffer) + ); + controller.close(); + }, + }); + } + + /** + * Downloads the contents of the stored file specified by @id and writes + * the contents to the @destination Stream. + */ + async downloadToStream( + id: FileId, + destination: WritableStream, + ) { + this.openDownloadStream(id).pipeTo(destination); + } + + /** + * Given a @id, delete this stored file’s files collection document and + * associated chunks from a GridFS bucket. + */ + async delete(id: FileId) { + await this.#filesCollection.deleteOne({ _id: id }); + const response = await this.#chunksCollection.deleteMany({ files_id: id }); + assert(response, `File not found for id ${id}`); + } + + /** + * Find and return the files collection documents that match @filter. + */ + find( + filter: Filter, + options: GridFSFindOptions = {}, + ): FindCursor { + return this.#filesCollection.find(filter ?? {}, options); + } + + /** + * Drops the files and chunks collections associated with + * this bucket. + */ + async drop() { + await this.#filesCollection.drop(); + await this.#chunksCollection.drop(); + } + + #checkIndexes = () => + checkIndexes( + this.#filesCollection, + this.#chunksCollection, + (value) => this.#checkedIndexes = value, + ); +} diff --git a/src/gridfs/indexes.ts b/src/gridfs/indexes.ts new file mode 100644 index 00000000..1f64762b --- /dev/null +++ b/src/gridfs/indexes.ts @@ -0,0 +1,89 @@ +import { Document } from "../../deps.ts"; +import { Collection } from "../collection/collection.ts"; +import { Chunk, File } from "../types/gridfs.ts"; + +export function createFileIndex(collection: Collection) { + const index = { filename: 1, uploadDate: 1 }; + + return collection.createIndexes({ + indexes: [{ + name: "gridFSFiles", + key: index, + background: false, + }], + }); +} +export function createChunksIndex(collection: Collection) { + const index = { "files_id": 1, n: 1 }; + + return collection.createIndexes({ + indexes: [{ + name: "gridFSFiles", + key: index, + unique: true, + background: false, + }], + }); +} + +export async function checkIndexes( + filesCollection: Collection, + chunksCollection: Collection, + hasCheckedIndexes: (value: boolean) => void, +) { + const filesCollectionIsEmpty = !await filesCollection.findOne({}, { + projection: { _id: 1 }, + }); + + const chunksCollectionIsEmpty = !await chunksCollection.findOne({}, { + projection: { _id: 1 }, + }); + + if (filesCollectionIsEmpty || chunksCollectionIsEmpty) { + // At least one collection is empty so we create indexes + createFileIndex(filesCollection); + createChunksIndex(chunksCollection); + hasCheckedIndexes(true); + return; + } + + // Now check that the right indexes are there + const fileIndexes = await filesCollection.listIndexes().toArray(); + let hasFileIndex = false; + + if (fileIndexes) { + fileIndexes.forEach((index) => { + const keys = Object.keys(index.key); + if ( + keys.length === 2 && index.key.filename === 1 && + index.key.uploadDate === 1 + ) { + hasFileIndex = true; + } + }); + } + + if (!hasFileIndex) { + createFileIndex(filesCollection); + } + + const chunkIndexes = await chunksCollection.listIndexes().toArray(); + let hasChunksIndex = false; + + if (chunkIndexes) { + chunkIndexes.forEach((index: Document) => { + const keys = Object.keys(index.key); + if ( + keys.length === 2 && index.key.filename === 1 && + index.key.uploadDate === 1 && index.options.unique + ) { + hasChunksIndex = true; + } + }); + } + + if (!hasChunksIndex) { + createChunksIndex(chunksCollection); + } + hasCheckedIndexes(true); +} diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts new file mode 100644 index 00000000..b6e812b0 --- /dev/null +++ b/src/gridfs/upload.ts @@ -0,0 +1,68 @@ +import { Binary, ObjectId } from "../../deps.ts"; +import { Collection } from "../../mod.ts"; +import { Chunk, File, GridFSUploadOptions } from "../types/gridfs.ts"; + +export interface BucketInfo { + filesCollection: Collection; + chunksCollection: Collection; + chunkSizeBytes: number; +} + +export function createUploadStream( + { chunkSizeBytes, chunksCollection, filesCollection }: BucketInfo, + filename: string, + id: ObjectId, + options?: GridFSUploadOptions, +) { + const chunkSizeBytesCombined = options?.chunkSizeBytes ?? chunkSizeBytes; + const uploadBuffer = new Uint8Array(new ArrayBuffer(chunkSizeBytesCombined)); + let bufferPosition = 0; + let chunksInserted = 0; + let fileSizeBytes = 0; + return new WritableStream({ + write: async (chunk: Uint8Array) => { + let remaining = chunk; + while (remaining.byteLength) { + const availableBuffer = chunkSizeBytesCombined - bufferPosition; + if (remaining.byteLength < availableBuffer) { + uploadBuffer.set(remaining, bufferPosition); + bufferPosition += remaining.byteLength; + fileSizeBytes += remaining.byteLength; + break; + } + const sliced = remaining.slice(0, availableBuffer); + remaining = remaining.slice(availableBuffer); + uploadBuffer.set(sliced, bufferPosition); + + await chunksCollection.insertOne({ + files_id: id, + n: chunksInserted, + data: new Binary(uploadBuffer), + }); + + bufferPosition = 0; + fileSizeBytes += sliced.byteLength; + ++chunksInserted; + } + }, + close: async () => { + // Write the last bytes that are left in the buffer + if (bufferPosition) { + await chunksCollection.insertOne({ + files_id: id, + n: chunksInserted, + data: new Binary(uploadBuffer.slice(0, bufferPosition)), + }); + } + + await filesCollection.insertOne({ + _id: id, + length: fileSizeBytes, + chunkSize: chunkSizeBytesCombined, + uploadDate: new Date(), + filename: filename, + metadata: options?.metadata, + }); + }, + }); +} diff --git a/src/types.ts b/src/types.ts index d189d115..4acd5ffd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,5 @@ import { Bson } from "../deps.ts"; +import { WriteConcern } from "./types/readWriteConcern.ts"; export type Document = Bson.Document; @@ -143,35 +144,6 @@ export interface UpdateOptions { // session?: ClientSession } -/** - * interface for WriteConcern documents used by MongoDB - * - * @see https://docs.mongodb.com/manual/reference/write-concern/ - */ -export interface WriteConcern { - /** - * The number of instances the write operation needs to be propagated to - * before proceeding. - * - * The string based values are: - * - * - majority: The calculated majority of nodes in a cluster has accepted the - * the write - * - custom write name: Writes have been acknowledged by nodes tagged with the - * custom write concern. - */ - w: number | "majority" | string; - /** - * If true, the server only returns after the operation has been commited to - * disk - */ - j: boolean; - /** - * An optional timeout value after which to stop the write operation - */ - wtimeout?: number; -} - /** * Options for controlling the collation of strings in a query * @@ -343,7 +315,7 @@ export interface AggregateOptions { bypassDocumentValidation?: boolean; /** * @default false - *Return document results as raw BSON buffers. + * Return document results as raw BSON buffers. */ raw?: boolean; /** @@ -867,3 +839,11 @@ export interface BuildInfo { ok: number; } + +export const enum ReadPreference { + Primary = "primary", + PrimaryPreferred = "primaryPreferred", + Secondary = "secondary", + SecondaryPreferred = "secondaryPreferred", + Nearest = "nearest", +} diff --git a/src/types/gridfs.ts b/src/types/gridfs.ts new file mode 100644 index 00000000..fc537efa --- /dev/null +++ b/src/types/gridfs.ts @@ -0,0 +1,108 @@ +import { Binary, ObjectId } from "../../deps.ts"; +import { Document, ReadPreference } from "../types.ts"; +import { ReadConcern, WriteConcern } from "../types/readWriteConcern.ts"; + +export type FileId = ObjectId; + +export interface Chunk { + _id: ObjectId; + files_id: ObjectId; + n: number; + data: Binary; +} + +export interface File { + _id: ObjectId; + length: number; + chunkSize: number; + uploadDate: Date; + filename: string; + metadata?: Document; +} + +export interface GridFSBucketOptions { + /** + * The bucket name. Defaults to 'fs'. + */ + bucketName?: string; + + /** + * The chunk size in bytes. Defaults to 255 KiB. + */ + chunkSizeBytes?: number; + + /** + * The write concern. Defaults to the write concern of the database. + */ + writeConcern?: WriteConcern; + + /** + * The read concern. Defaults to the read concern of the database. + */ + readConcern?: ReadConcern; + + /** + * The read preference. Defaults to the read preference of the database. + */ + readPreference?: ReadPreference; +} + +export interface GridFSUploadOptions { + /** + * The number of bytes per chunk of this file. Defaults to the + * chunkSizeBytes in the GridFSBucketOptions. + */ + chunkSizeBytes?: number; + + /** + * User data for the 'metadata' field of the files collection document. + * If not provided the driver MUST omit the metadata field from the + * files collection document. + */ + metadata?: Document; +} + +export class GridFSFindOptions { + /** + * Enables writing to temporary files on the server. When set to true, the server + * can write temporary data to disk while executing the find operation on the files collection. + * + * This option is sent only if the caller explicitly provides a value. The default + * is to not send a value. For servers < 3.2, this option is ignored and not sent + * as allowDiskUse does not exist in the OP_QUERY wire protocol. + * + * @see https://docs.mongodb.com/manual/reference/command/find/ + */ + allowDiskUse?: boolean; + + /** + * The number of documents to return per batch. + */ + batchSize?: number; + + /** + * The maximum number of documents to return. + */ + limit?: number; + + /** + * The maximum amount of time to allow the query to run. + */ + maxTimeMS?: number; + + /** + * The server normally times out idle cursors after an inactivity period (10 minutes) + * to prevent excess memory use. Set this option to prevent that. + */ + noCursorTimeout?: boolean; + + /** + * The number of documents to skip before returning. + */ + skip?: number; + + /** + * The order by which to sort results. Defaults to not sorting. + */ + sort?: Document; +} diff --git a/src/types/readWriteConcern.ts b/src/types/readWriteConcern.ts new file mode 100644 index 00000000..904f034c --- /dev/null +++ b/src/types/readWriteConcern.ts @@ -0,0 +1,51 @@ +/** + * @module @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern + */ + +const enum ReadConcernLevel { + local = "local", + majority = "majority", + linearizable = "linearizable", + available = "available", + snapshot = "snapshot", +} + +/** + * interface for ReadConcern documents used by MongoDB + * @see https://docs.mongodb.com/manual/reference/read-concern/ + */ +export interface ReadConcern { + /** + * The level of the read concern. + */ + level?: ReadConcernLevel | String; +} + +/** + * interface for WriteConcern documents used by MongoDB + * + * @see https://docs.mongodb.com/manual/reference/write-concern/ + */ +export interface WriteConcern { + /** + * The number of instances the write operation needs to be propagated to + * before proceeding. + * + * The string based values are: + * + * - majority: The calculated majority of nodes in a cluster has accepted the + * the write + * - custom write name: Writes have been acknowledged by nodes tagged with the + * custom write concern. + */ + w: number | "majority" | string; + /** + * If true, the server only returns after the operation has been commited to + * disk + */ + j: boolean; + /** + * An optional timeout value after which to stop the write operation + */ + wtimeout?: number; +} diff --git a/tests/cases/06_gridfs.ts b/tests/cases/06_gridfs.ts new file mode 100644 index 00000000..3aaf0c0e --- /dev/null +++ b/tests/cases/06_gridfs.ts @@ -0,0 +1,118 @@ +import { GridFSBucket } from "../../mod.ts"; +import { + assertArrayBufferEquals, + assertArrayBufferNotEquals, + testWithClient, +} from "../common.ts"; +import { assert, assertEquals } from "../test.deps.ts"; + +export default function gridfsTests() { + testWithClient("GridFS: Echo small Hello World", async (client) => { + const bucket = new GridFSBucket(client.database("test"), { + bucketName: "echo", + }); + const upstream = bucket.openUploadStream("test.txt"); + const writer = upstream.getWriter(); + writer.write(new TextEncoder().encode("Hello World! πŸ‘‹")); + await writer.close(); + + const getId = + (await bucket.find({ filename: "test.txt" }).toArray())[0]._id; + + assert(getId); + + const text = await new Response(bucket.openDownloadStream(getId)).text(); + + assertEquals(text, "Hello World! πŸ‘‹"); + }); + + testWithClient("GridFS: Echo large Image", async (client) => { + const bucket = new GridFSBucket(client.database("test"), { + bucketName: "deno_logo", + }); + + // Set an impractically low chunkSize to test chunking algorithm + const upstream = bucket.openUploadStream("deno_logo.png", { + chunkSizeBytes: 255 * 8, + }); + + await (await fetch("https://deno.land/images/deno_logo.png")).body!.pipeTo( + upstream, + ); + + const getId = + (await bucket.find({ filename: "deno_logo.png" }).toArray())[0]._id; + + const expected = await fetch("https://deno.land/images/deno_logo.png"); + const actual = await new Response(bucket.openDownloadStream(getId)) + .arrayBuffer(); + + assertArrayBufferEquals(actual, await expected.arrayBuffer()); + }); + + testWithClient( + "GridFS: Echo large Image (compare with different Image)", + async (client) => { + const bucket = new GridFSBucket(client.database("test"), { + bucketName: "deno_logo", + }); + + const upstream = bucket.openUploadStream("deno_logo.png"); + + await (await fetch("https://deno.land/images/deno_logo.png")).body! + .pipeTo( + upstream, + ); + + const getId = + (await bucket.find({ filename: "deno_logo.png" }).toArray())[0]._id; + + const expected = await fetch("https://deno.land/images/deno_logo_4.gif"); + const actual = await new Response(bucket.openDownloadStream(getId)) + .arrayBuffer(); + + assertArrayBufferNotEquals(actual, await expected.arrayBuffer()); + }, + ); + + testWithClient( + "GridFS: Metadata does get stored correctly", + async (client) => { + const bucket = new GridFSBucket(client.database("test"), { + bucketName: "metadata", + }); + const upstream = bucket.openUploadStream("metadata.txt", { + metadata: { + helloWorld: "this is a test", + }, + }); + const writer = upstream.getWriter(); + writer.write(new TextEncoder().encode("Hello World! πŸ‘‹")); + await writer.close(); + + const file = + (await bucket.find({ filename: "metadata.txt" }).toArray())[0]; + + assertEquals("this is a test", file.metadata?.helloWorld); + }, + ); + + testWithClient( + "GridFS: Delete does work as expected", + async (client) => { + const bucket = new GridFSBucket(client.database("test"), { + bucketName: "delete", + }); + const upstream = bucket.openUploadStream("stuff.txt"); + const writer = upstream.getWriter(); + writer.write(new TextEncoder().encode("[redacted]")); + await writer.close(); + + let file = await bucket.find({ filename: "stuff.txt" }).toArray(); + assert(file[0]); + await bucket.delete(file[0]._id); + file = await bucket.find({ filename: "stuff.txt" }).toArray(); + assert(!file[0]); + }, + ); +} diff --git a/tests/cases/99_cleanup.ts b/tests/cases/99_cleanup.ts index fbfd7737..1875e1f4 100644 --- a/tests/cases/99_cleanup.ts +++ b/tests/cases/99_cleanup.ts @@ -1,3 +1,4 @@ +import { GridFSBucket } from "../../src/gridfs/bucket.ts"; import { testWithClient } from "../common.ts"; export default function cleanup() { @@ -7,6 +8,14 @@ export default function cleanup() { await db.collection("mongo_test_users_2").drop().catch((e) => e); await db.collection("mongo_test_users").drop().catch((e) => e); await db.collection("find_and_modify").drop().catch((e) => e); + await new GridFSBucket(db, { bucketName: "deno_logo" }) + .drop().catch((e) => e); + await new GridFSBucket(db, { bucketName: "echo" }) + .drop().catch((e) => e); + await new GridFSBucket(db, { bucketName: "metadata" }) + .drop().catch((e) => e); + await new GridFSBucket(db, { bucketName: "delete" }) + .drop().catch((e) => e); } catch { } }); diff --git a/tests/common.ts b/tests/common.ts index 95b17552..b79a7dbf 100644 --- a/tests/common.ts +++ b/tests/common.ts @@ -1,4 +1,5 @@ import { MongoClient } from "../mod.ts"; +import { assertEquals } from "./test.deps.ts"; const hostname = "127.0.0.1"; @@ -18,3 +19,39 @@ async function getClient(): Promise { await client.connect(`mongodb://${hostname}:27017`); return client; } + +export function assertArrayBufferEquals( + actual: ArrayBuffer, + expected: ArrayBuffer, +) { + assertEquals(arrayBufferEquals(actual, expected), true); +} + +export function assertArrayBufferNotEquals( + actual: ArrayBuffer, + expected: ArrayBuffer, +) { + assertEquals(arrayBufferEquals(actual, expected), false); +} + +function arrayBufferEquals(buf1: ArrayBuffer, buf2: ArrayBuffer): unknown { + if (buf1 === buf2) { + return true; + } + + if (buf1.byteLength !== buf2.byteLength) { + return false; + } + + var view1 = new DataView(buf1); + var view2 = new DataView(buf2); + + var i = buf1.byteLength; + while (i--) { + if (view1.getUint8(i) !== view2.getUint8(i)) { + return false; + } + } + + return true; +} diff --git a/tests/test.ts b/tests/test.ts index 4dc4482c..609c9926 100644 --- a/tests/test.ts +++ b/tests/test.ts @@ -4,6 +4,7 @@ import connectTests from "./cases/02_connect.ts"; import curdTests from "./cases/03_curd.ts"; import indexesTests from "./cases/04_indexes.ts"; import srvTests from "./cases/05_srv.ts"; +import gridfsTests from "./cases/06_gridfs.ts"; import cleanup from "./cases/99_cleanup.ts"; @@ -11,6 +12,7 @@ uriTests(); authTests(); connectTests(); curdTests(); +gridfsTests(); indexesTests(); srvTests();