From 8c93d55630c6f70fb52700043164a1a7769238ff Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 1 Jul 2024 09:38:26 +0100 Subject: [PATCH] Improvments to COPY support, including to/from and url --- packages/pglite/Makefile | 2 +- packages/pglite/README.md | 32 +++-- packages/pglite/examples/copy.html | 8 +- packages/pglite/release/postgres.d.ts | 3 + packages/pglite/src/interface.ts | 12 +- packages/pglite/src/parse.ts | 20 ++-- packages/pglite/src/pglite.ts | 165 ++++++++++++++++++++++---- packages/pglite/tests/basic.test.js | 13 +- postgres | 2 +- 9 files changed, 203 insertions(+), 54 deletions(-) diff --git a/packages/pglite/Makefile b/packages/pglite/Makefile index ffa9d52d..0a3c8491 100644 --- a/packages/pglite/Makefile +++ b/packages/pglite/Makefile @@ -14,7 +14,7 @@ build: EMCC_CFLAGS="-s SIDE_MODULE=1 -sERROR_ON_UNDEFINED_SYMBOLS=0 -sWARN_ON_UNDEFINED_SYMBOLS=0 -sTOTAL_MEMORY=65536000 -sMODULARIZE=1 -sEXPORT_ES6=1 -sEXPORTED_RUNTIME_METHODS='FS'" \ emmake make -C src/pl/plpgsql MAKELEVEL=0 - EMCC_CFLAGS="-sALLOW_MEMORY_GROWTH=1 -sERROR_ON_UNDEFINED_SYMBOLS=0 -sWARN_ON_UNDEFINED_SYMBOLS=0 -sTOTAL_MEMORY=65536000 -sEMULATE_FUNCTION_POINTER_CASTS=1 -sMODULARIZE=1 -sEXPORT_ES6=1 -sEXPORTED_FUNCTIONS=_main,_ExecProtocolMsg,_malloc,_free -sEXPORTED_RUNTIME_METHODS=ccall,cwrap,FS" \ + EMCC_CFLAGS="-sALLOW_MEMORY_GROWTH=1 -sERROR_ON_UNDEFINED_SYMBOLS=0 -sWARN_ON_UNDEFINED_SYMBOLS=0 -sTOTAL_MEMORY=65536000 -sEMULATE_FUNCTION_POINTER_CASTS=1 -sMODULARIZE=1 -sEXPORT_ES6=1 -sEXPORTED_FUNCTIONS=_main,_ExecProtocolMsg,_malloc,_free -sEXPORTED_RUNTIME_METHODS=ccall,cwrap,FS,stringToNewUTF8" \ emmake make -C src/backend MAKELEVEL=0 mkdir -p ../packages/pglite/release diff --git a/packages/pglite/README.md b/packages/pglite/README.md index 0164a375..d84ac18d 100644 --- a/packages/pglite/README.md +++ b/packages/pglite/README.md @@ -196,8 +196,8 @@ The `query` and `exec` methods take an optional `options` objects with the follo }); ``` -- `blob: Blob | File` - Attach a `Blob` or `File` object to the query that can used with a `COPY FROM` command by using the virtual `/dev/blob` device, see [importing and exporting](#importing-and-exporting-with-copy-tofrom). +- `blobs: {[name: string]: Blob | File}` + Attach a `Blob` or `File` object to the query that can used with a `COPY FROM` command by using the syntax `COPY table FROM 'blob:name'`, see [importing and exporting](#importing-and-exporting-with-copy-tofrom). #### `.exec(query: string, options?: QueryOptions): Promise>` @@ -307,7 +307,7 @@ Result objects have the following properties: - `rows: Row[]` - The rows retuned by the query - `affectedRows?: number` - Count of the rows affected by the query. Note this is *not* the count of rows returned, it is the number or rows in the database changed by the query. - `fields: { name: string; dataTypeID: number }[]` - Field name and Postgres data type ID for each field returned. -- `blob: Blob` - A `Blob` containing the data written to the virtual `/dev/blob/` device by a `COPY TO` command. See [importing and exporting](#importing-and-exporting-with-copy-tofrom). +- `blobs: {[name: string]: Blob}` - A `Blob` containing the data written to `blob:name` by a `COPY TO` command. See [importing and exporting](#importing-and-exporting-with-copy-tofrom). ### Row Objects: @@ -336,21 +336,33 @@ await pg.exec(` ### Importing and exporting with `COPY TO/FROM` -PGlite has support importing and exporting via `COPY TO/FROM` by using a virtual `/dev/blob` device. +PGlite has support importing and exporting via `COPY TO/FROM` by attaching blobs to queries and results. -To import a file pass the `File` or `Blob` in the query options as `blob`, and copy from the `/dev/blob` device. +To import a file pass the `File` or `Blob` in the query `blobs` options: ```ts -await pg.query("COPY my_table FROM '/dev/blob';", [], { - blob: MyBlob +await pg.query("COPY my_table FROM 'blob:my_blob';", [], { + blob: { my_blob: MyBlob }, }) ``` -To export a table or query to a file you just have to write to the `/dev/blob` device, the file will be retied as `blob` on the query results: +To export a table or query to a file you write to the `blob:blob_name`, the file will be returned on the query results: ```ts -const ret = await pg.query("COPY my_table TO '/dev/blob';") -// ret.blob is a `Blob` object with the data from the copy. +const ret = await pg.query("COPY my_table TO 'blob:my_blob';") +// ret.blobs['my_blob'] is a `Blob` object with the data from the copy. +``` + +It is also possible to copy from a URL: + +```ts +await pg.query("COPY my_table FROM 'https://example.com/my_data.csv';") +``` + +and export to a URL, the request is sent a a HTTP PUT: + +```ts +await pg.query("COPY my_table TO 'https://example.com/api/endpoint';") ``` ## Extensions diff --git a/packages/pglite/examples/copy.html b/packages/pglite/examples/copy.html index 8ae427d2..4b7e4291 100644 --- a/packages/pglite/examples/copy.html +++ b/packages/pglite/examples/copy.html @@ -15,10 +15,10 @@ // Copy the date to a file: console.log('Copying data to file...') // 'test.csv - const ret = await pg.query("COPY test TO '/dev/blob' WITH (FORMAT binary);"); + const ret = await pg.query("COPY test TO 'blob:test.csv' WITH (FORMAT binary);"); console.log('Data copied to blob:') - const blob = ret.blob; + const blob = ret.blobs['test.csv']; console.log(blob); // Download the file: @@ -37,8 +37,8 @@ // import the data from the file: console.log('Importing data from file...') - const ret2 = await pg.query("COPY test2 FROM '/dev/blob' WITH (FORMAT binary);", [], { - blob: blob + const ret2 = await pg.query("COPY test2 FROM 'blob:test.csv' WITH (FORMAT binary);", [], { + blobs: {'test.csv': blob} }); console.log('Data imported from file:') diff --git a/packages/pglite/release/postgres.d.ts b/packages/pglite/release/postgres.d.ts index d2e1d9d5..4116d24d 100644 --- a/packages/pglite/release/postgres.d.ts +++ b/packages/pglite/release/postgres.d.ts @@ -426,6 +426,9 @@ export interface EmPostgres extends EmscriptenModule { FS: FS; eventTarget: EventTarget; Event: typeof CustomEvent; + copyFrom: (fileName: string, isProgram: boolean) => string | null; + copyTo: (fileName: string, isProgram: boolean) => string | null; + copyToEnd: () => string | null; onRuntimeInitialized: (Module: EmPostgres) => Promise; } diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts index d6ba5800..7e571d9d 100644 --- a/packages/pglite/src/interface.ts +++ b/packages/pglite/src/interface.ts @@ -14,7 +14,11 @@ export interface ParserOptions { export interface QueryOptions { rowMode?: RowMode; parsers?: ParserOptions; - blob?: Blob | File; + blobs?: QueryBlobs; +} + +export interface QueryBlobs { + [name: string]: Blob | File; } export interface ExecProtocolOptions { @@ -98,11 +102,15 @@ export type PGliteInterfaceExtensions = E extends Extensions export type Row = T; +export type ResultBlobs = { + [name: string]: Blob | File; +}; + export type Results = { rows: Row[]; affectedRows?: number; fields: { name: string; dataTypeID: number }[]; - blob?: Blob; // Only set when a file is returned, such as from a COPY command + blobs?: ResultBlobs; }; export interface Transaction { diff --git a/packages/pglite/src/parse.ts b/packages/pglite/src/parse.ts index de988f97..50a8497c 100644 --- a/packages/pglite/src/parse.ts +++ b/packages/pglite/src/parse.ts @@ -4,7 +4,7 @@ import { DataRowMessage, CommandCompleteMessage, } from "pg-protocol/dist/messages.js"; -import type { Results, QueryOptions } from "./interface.js"; +import type { Results, QueryOptions, ResultBlobs } from "./interface.js"; import { parseType } from "./types.js"; /** @@ -14,7 +14,7 @@ import { parseType } from "./types.js"; export function parseResults( messages: Array, options?: QueryOptions, - blob?: Blob + blobs?: ResultBlobs, ): Array { const resultSets: Results[] = []; let currentResultSet: Results = { rows: [], fields: [] }; @@ -24,7 +24,7 @@ export function parseResults( (msg) => msg instanceof RowDescriptionMessage || msg instanceof DataRowMessage || - msg instanceof CommandCompleteMessage + msg instanceof CommandCompleteMessage, ); filteredMessages.forEach((msg, index) => { @@ -40,9 +40,9 @@ export function parseResults( parseType( field, currentResultSet!.fields[i].dataTypeID, - options?.parsers - ) - ) + options?.parsers, + ), + ), ); } else { // rowMode === "object" @@ -53,10 +53,10 @@ export function parseResults( parseType( field, currentResultSet!.fields[i].dataTypeID, - options?.parsers + options?.parsers, ), - ]) - ) + ]), + ), ); } } else if (msg instanceof CommandCompleteMessage) { @@ -66,7 +66,7 @@ export function parseResults( resultSets.push({ ...currentResultSet, affectedRows, - ...(blob ? { blob } : {}), + ...(blobs ? { blobs } : {}), }); else resultSets.push(currentResultSet); diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index 3fb352fb..76328f3a 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -12,6 +12,7 @@ import type { Results, Transaction, QueryOptions, + QueryBlobs, ExecProtocolOptions, PGliteInterfaceExtensions, Extensions, @@ -59,9 +60,13 @@ export class PGlite implements PGliteInterface { // These are the current ArrayBuffer that is being read or written to // during a query, such as COPY FROM or COPY TO. + #queryBlobBuffers?: { [name: string]: ArrayBuffer }; + #queryResultBlobs?: { [name: string]: Blob }; #queryReadBuffer?: ArrayBuffer; #queryWriteChunks?: Uint8Array[]; - + #queryWriteCurrentName?: string; + #queryWriteCurrentIsProgram?: boolean; + #notifyListeners = new Map void>>(); #globalNotifyListeners = new Set< (channel: string, payload: string) => void @@ -148,7 +153,7 @@ export class PGlite implements PGliteInterface { let emscriptenOpts: Partial = { arguments: [ "--single", // Single user mode - "-F", // Disable fsync (TODO: Only for in-memory mode?) + // "-F", // Disable fsync (TODO: Only for in-memory mode?) "-O", // Allow the structure of system tables to be modified. This is used by initdb "-j", // Single use mode - Use semicolon followed by two newlines, rather than just newline, as the command entry terminator. "-c", // Set parameter @@ -174,7 +179,6 @@ export class PGlite implements PGliteInterface { // e.g. COPY mytable TO '/dev/blob' WITH (FORMAT binary) // The data is returned by the query as a `blob` property in the results const devId = mod.FS.makedev(64, 0); - let callCounter = 0; const devOpt = { open: (stream: any) => {}, close: (stream: any) => {}, @@ -204,7 +208,6 @@ export class PGlite implements PGliteInterface { length: number, position: number, ) => { - callCounter++; this.#queryWriteChunks ??= []; this.#queryWriteChunks.push( buffer.slice(offset, offset + length), @@ -226,6 +229,15 @@ export class PGlite implements PGliteInterface { }, eventTarget: this.#eventTarget, Event: PGEvent, + copyFrom: (args, isProgram) => { + return this.#copyFrom(args, isProgram); + }, + copyTo: (args, isProgram) => { + return this.#copyTo(args, isProgram); + }, + copyToEnd: () => { + return this.#copyToFinalize(); + }, }; // Setup extensions @@ -388,7 +400,7 @@ export class PGlite implements PGliteInterface { return await this.#queryMutex.runExclusive(async () => { // We need to parse, bind and execute a query with parameters this.#log("runQuery", query, params, options); - await this.#handleBlob(options?.blob); + await this.#handleBlobs(options?.blobs); const parsedParams = params?.map((p) => serializeType(p)) || []; let results; try { @@ -412,19 +424,15 @@ export class PGlite implements PGliteInterface { } finally { await this.#execProtocolNoSync(serialize.sync()); } - this.#cleanupBlob(); + const blobs = this.#queryResultBlobs; + this.#cleanupBlobs(); if (!this.#inTransaction) { await this.#syncToFs(); } - let blob: Blob | undefined; - if (this.#queryWriteChunks) { - blob = new Blob(this.#queryWriteChunks); - this.#queryWriteChunks = undefined; - } return parseResults( results.map(([msg]) => msg), options, - blob, + blobs, )[0] as Results; }); } @@ -443,26 +451,22 @@ export class PGlite implements PGliteInterface { return await this.#queryMutex.runExclusive(async () => { // No params so we can just send the query this.#log("runExec", query, options); - await this.#handleBlob(options?.blob); + await this.#handleBlobs(options?.blobs); let results; try { results = await this.#execProtocolNoSync(serialize.query(query)); } finally { await this.#execProtocolNoSync(serialize.sync()); } - this.#cleanupBlob(); + const blobs = this.#queryResultBlobs; + this.#cleanupBlobs(); if (!this.#inTransaction) { await this.#syncToFs(); } - let blob: Blob | undefined; - if (this.#queryWriteChunks) { - blob = new Blob(this.#queryWriteChunks); - this.#queryWriteChunks = undefined; - } return parseResults( results.map(([msg]) => msg), options, - blob, + blobs, ) as Array; }); } @@ -531,15 +535,130 @@ export class PGlite implements PGliteInterface { * Handle a file attached to the current query * @param file The file to handle */ - async #handleBlob(blob?: File | Blob) { - this.#queryReadBuffer = blob ? await blob.arrayBuffer() : undefined; + async #handleBlobs(blobs?: QueryBlobs) { + const buffers: { [name: string]: ArrayBuffer } = {}; + const promises = Object.entries(blobs ?? {}).map(async ([name, blob]) => { + buffers[name] = await blob.arrayBuffer(); + }); + await Promise.all(promises); + this.#queryBlobBuffers = buffers; } /** * Cleanup the current file */ - #cleanupBlob() { + #cleanupBlobs() { + this.#queryBlobBuffers = undefined; + this.#queryResultBlobs = undefined; this.#queryReadBuffer = undefined; + this.#queryWriteChunks = undefined; + this.#queryWriteCurrentName = undefined; + this.#queryWriteCurrentIsProgram = undefined; + } + + /** + * Copy data from a file, blob or url + * The data is made available at /dev/blob + * @param args The copy arguments + */ + #copyFrom(args: string, isProgram = false): string | null { + if (args.startsWith("blob:")) { + const blobName = args.slice(5); + this.#queryReadBuffer = this.#queryBlobBuffers?.[blobName]; + if (!this.#queryReadBuffer) { + return "Blob not found"; + } + } else if (args.startsWith("http://") || args.startsWith("https://")) { + const xhr = new XMLHttpRequest(); + xhr.overrideMimeType("text/plain; charset=x-user-defined"); + xhr.open("GET", args, false); // Synchronous request + xhr.send(); + if (xhr.status !== 200) { + return `Failed to fetch url: ${xhr.status}`; + } + // Convert the response to an ArrayBuffer + const buffer = new ArrayBuffer(xhr.response.length); + const view = new Uint8Array(buffer); + for (let i = 0; i < xhr.response.length; i++) { + view[i] = xhr.response.charCodeAt(i); + } + this.#queryReadBuffer = buffer; + } else if (isProgram) { + // User provided a function to read from + return "Not implemented"; + } else { + // Should not happen - file read is handled by the emscripten FS + return "Not implemented"; + } + return null; + } + + /** + * Copy data to a file, blob or url + * The data is written to /dev/blob during the query + * and available as this.#queryWriteChunks + * @param args The copy arguments + */ + #copyTo(args: string, isProgram = false): string | null { + if (args.startsWith("blob:")) { + // Save to a blob on the result object + this.#queryWriteCurrentName = args; + this.#queryWriteCurrentIsProgram = false; + this.#queryWriteChunks = []; + } else if (args.startsWith("http://") || args.startsWith("https://")) { + // PUT the data to the url + this.#queryWriteCurrentName = args; + this.#queryWriteCurrentIsProgram = false; + this.#queryWriteChunks = []; + } else if (isProgram) { + // User provided a function to write to + this.#queryWriteCurrentName = args; + this.#queryWriteCurrentIsProgram = true; + return "Not implemented"; + } else { + // Should not happen - file write is handled by the emscripten FS + return "Not implemented"; + } + return null; + } + + /** + * Finalize the copy operation + * @returns An error message if the copy operation failed + */ + #copyToFinalize() { + if (!this.#queryWriteChunks) { + return "No data to write"; + } + try { + const blobName = this.#queryWriteCurrentName!; + const blob = new Blob(this.#queryWriteChunks); + this.#queryWriteChunks = undefined; + if (blobName.startsWith("blob:")) { + this.#queryResultBlobs ??= {}; + this.#queryResultBlobs[blobName.slice(5)] = blob; + return null; + } else if ( + blobName.startsWith("http://") || + blobName.startsWith("https://") + ) { + const xhr = new XMLHttpRequest(); + xhr.open("PUT", blobName, false); // Synchronous request + xhr.setRequestHeader("Content-Type", "application/octet-stream"); + xhr.send(blob); + if (xhr.status !== 200) { + return `Failed to PUT to url: ${xhr.status}`; + } + return null; + } else if (this.#queryWriteCurrentIsProgram) { + return "Not implemented"; + } else { + // Should not happen - file write is handled by the emscripten FS + return "Not implemented"; + } + } catch (e) { + return (e as Error).toString(); + } } /** diff --git a/packages/pglite/tests/basic.test.js b/packages/pglite/tests/basic.test.js index af2709f4..c45e0c9c 100644 --- a/packages/pglite/tests/basic.test.js +++ b/packages/pglite/tests/basic.test.js @@ -331,9 +331,14 @@ test("basic copy to/from blob", async (t) => { `); // copy to - const ret = await db.query("COPY test TO '/dev/blob' WITH (FORMAT csv);"); - const csv = await ret.blob.text(); + const ret = await db.exec(` + COPY test TO 'blob:test.csv' WITH (FORMAT csv); + COPY (SELECT 1) TO 'blob:test2.csv' WITH (FORMAT csv); + `); + const csv = await ret[1].blobs['test.csv'].text(); + const csv2 = await ret[1].blobs['test2.csv'].text(); t.is(csv, "1,test\n2,test2\n"); + t.is(csv2, "1\n"); // copy from const blob2 = new Blob([csv]); @@ -343,7 +348,9 @@ test("basic copy to/from blob", async (t) => { test TEXT ); `); - await db.query("COPY test2 FROM '/dev/blob' WITH (FORMAT csv);", [], { blob: blob2 }); + await db.query("COPY test2 FROM 'blob:test.csv' WITH (FORMAT csv);", [], { + blobs: { "test.csv": blob2 }, + }); const res = await db.query(` SELECT * FROM test2; `); diff --git a/postgres b/postgres index 1ef91d2a..9e07a58e 160000 --- a/postgres +++ b/postgres @@ -1 +1 @@ -Subproject commit 1ef91d2a9a370c463edaec0f98ac72cfd0b57647 +Subproject commit 9e07a58e511556c8d9f0080225ed9a4e6320d642