diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d8a95606..2acf5224b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - sp - explict error thrown if SPFx context is null or undefined when needed + - getStream method on all readable files + - addChunked updated to accept stream as content, new signature with props object ### Removed @@ -35,6 +37,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - PagedItemCollection removed from library - removed /items/get-all import, unneeded, use async iterator patterns - ./operations.ts methods moved to ./spqueryable.ts + - startUpload, continueUpload, finishUpload File protected methods removed + +- nodejs + - removed stream extensions, moved into sp ### Changed @@ -55,3 +61,4 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - sp - _Items and IItems now supports async iterator pattern + - chunked upload/add progress object shape changed to : { uploadId: string; stage: "starting" | "continue" | "finishing"; offset: number; } diff --git a/packages/nodejs/behaviors/stream-parse.ts b/packages/nodejs/behaviors/stream-parse.ts deleted file mode 100644 index fd1f5ea7e..000000000 --- a/packages/nodejs/behaviors/stream-parse.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { TimelinePipe } from "@pnp/core"; -import { parseBinderWithErrorCheck, Queryable } from "@pnp/queryable"; - -export function StreamParse(): TimelinePipe { - - return parseBinderWithErrorCheck(async r => ({ body: r.body, knownLength: parseInt(r?.headers?.get("content-length") || "-1", 10) })); -} diff --git a/packages/nodejs/index.ts b/packages/nodejs/index.ts index f6c7d77a7..7e2fe495c 100644 --- a/packages/nodejs/index.ts +++ b/packages/nodejs/index.ts @@ -16,18 +16,7 @@ import * as NodeFetch from "node-fetch"; })(global); -// auto populate all extensions -import "./sp-extensions/stream.js"; - -// export extension types as a namespace -import * as SPNS from "./sp-extensions/stream.js"; - -export { - SPNS, -}; - export * from "./behaviors/msal.js"; export * from "./behaviors/fetch.js"; -export * from "./behaviors/stream-parse.js"; export * from "./behaviors/spdefault.js"; export * from "./behaviors/graphdefault.js"; diff --git a/packages/nodejs/sp-extensions/stream.ts b/packages/nodejs/sp-extensions/stream.ts deleted file mode 100644 index a4f4949c5..000000000 --- a/packages/nodejs/sp-extensions/stream.ts +++ /dev/null @@ -1,136 +0,0 @@ -// import { asCancelableScope, CancelAction, headers } from "@pnp/queryable"; -// import { File, Files, IFile, IFileAddResult, IFiles, IFileUploadProgressData } from "@pnp/sp/files/index.js"; -// import { spPost, encodePath } from "@pnp/sp"; -// import { ReadStream } from "fs"; -import { PassThrough } from "stream"; -// import { extendFactory, getGUID, isFunc } from "@pnp/core"; -// import { StreamParse } from "../behaviors/stream-parse.js"; -// import { fileFromServerRelativePath } from "@pnp/sp/files/index.js"; - -export interface IResponseBodyStream { - body: PassThrough; - knownLength: number; -} - -// extendFactory(File, { - -// getStream(): Promise { -// return File(this, "$value").using(StreamParse())(headers({ "binaryStringResponseBody": "true" })); -// }, - -// /** -// * Sets the contents of a file using a chunked upload approach. Not supported in batching. -// * -// * @param stream The file to upload (as readable stream) -// * @param progress A callback function which can be used to track the progress of the upload -// */ -// setStreamContentChunked: asCancelableScope(async function (this: IFile, stream: ReadStream, progress?: (data: IFileUploadProgressData) => void): Promise { - -// if (!isFunc(progress)) { -// progress = () => void (0); -// } - -// const uploadId = getGUID(); - -// const fileRef = File(this).using(CancelAction(() => { -// return File(this).cancelUpload(uploadId); -// })); - -// let blockNumber = -1; -// let pointer = 0; - -// for await (const chunk of stream) { -// blockNumber++; -// progress({ -// uploadId, -// blockNumber, -// chunkSize: chunk.length, -// currentPointer: pointer, -// fileSize: -1, -// stage: blockNumber === 0 ? "starting" : "continue", -// totalBlocks: -1, -// }); -// if (blockNumber === 0) { -// pointer = await fileRef.startUpload(uploadId, chunk); -// } else { -// pointer = await fileRef.continueUpload(uploadId, pointer, chunk); -// } -// } - -// progress({ uploadId, blockNumber, chunkSize: -1, currentPointer: -1, fileSize: -1, stage: "finishing", totalBlocks: -1 }); -// return await fileRef.finishUpload(uploadId, pointer, Buffer.from([])); -// }), -// }); - -// extendFactory(Files, { - -// /** -// * Uploads a file. Not supported for batching -// * -// * @param url The folder-relative url of the file -// * @param content The Blob file content or File readable stream to add -// * @param progress A callback function which can be used to track the progress of the upload -// * @param shouldOverWrite Should a file with the same name in the same location be overwritten? (default: true) -// * @param chunkSize The size of each file slice, in bytes (default: 10485760) -// * @returns The new File and the raw response. -// */ -// addChunked: asCancelableScope(async function ( -// this: IFiles, -// url: string, -// content: Blob | ReadStream, -// progress?: (data: IFileUploadProgressData) => void, -// shouldOverWrite = true, -// chunkSize = 10485760 -// ) { - -// const response = await spPost(Files(this, `add(overwrite=${shouldOverWrite},url='${encodePath(url)}')`)); - -// const file = fileFromServerRelativePath(this, response.ServerRelativeUrl); - -// file.using(CancelAction(async () => { -// return File(file).delete(); -// })); - -// if ("function" === typeof (content as ReadStream).read) { -// return file.setStreamContentChunked(content as ReadStream, progress); -// } - -// return file.setContentChunked(content as Blob, progress, chunkSize); -// }), -// }); - -// // these are needed to avoid a type/name not found issue where TSC doesn't properly keep -// // the references used within the module declarations below -// type ProgressFunc = (data: IFileUploadProgressData) => void; -// type ChunkedResult = Promise; - -// declare module "@pnp/sp/files/types" { - -// interface IFile { -// /** -// * Gets a PassThrough stream representing the file -// */ -// getStream(): Promise; - -// /** -// * Sets a file stream content chunk -// */ -// setStreamContentChunked( -// stream: ReadStream, -// progress?: ProgressFunc, -// ): ChunkedResult; -// } - -// interface IFiles { -// /** -// * Adds a file stream in chunks -// */ -// addChunked( -// url: string, -// content: Blob | ReadStream, -// progress?: ProgressFunc, -// shouldOverWrite?: boolean, -// chunkSize?: number, -// ): ChunkedResult; -// } -// } diff --git a/packages/sp/behaviors/telemetry.ts b/packages/sp/behaviors/telemetry.ts index ba13012a7..b4e189ca0 100644 --- a/packages/sp/behaviors/telemetry.ts +++ b/packages/sp/behaviors/telemetry.ts @@ -1,4 +1,4 @@ -import { TimelinePipe } from "@pnp/core"; +import { TimelinePipe, stringIsNullOrEmpty } from "@pnp/core"; import { Queryable } from "@pnp/queryable"; export function Telemetry(): TimelinePipe { @@ -14,10 +14,10 @@ export function Telemetry(): TimelinePipe { // remove anything before the _api as that is potentially PII and we don't care, just want to get the called path to the REST API // and we want to modify any (*) calls at the end such as items(3) and items(344) so we just track "items()" - clientTag += pathname - .substring(pathname.indexOf("_api/") + 5) - .split("/") - .map((value, index, arr) => index === arr.length - 1 ? value.replace(/\(.*?$/i, "()") : value[0]).join("."); + clientTag = pathname.split("/") + .filter((v) => !stringIsNullOrEmpty(v) && ["_api", "v2.1", "v2.0"].indexOf(v) < 0) + .map((value, index, arr) => index === arr.length - 1 ? value.replace(/\(.*?$/i, "()") : value[0]) + .join("."); if (clientTag.length > 32) { clientTag = clientTag.substring(0, 32); diff --git a/packages/sp/files/readable-file.ts b/packages/sp/files/readable-file.ts index 730830355..0ba3c8f16 100644 --- a/packages/sp/files/readable-file.ts +++ b/packages/sp/files/readable-file.ts @@ -1,7 +1,26 @@ import { TimelinePipe } from "@pnp/core"; -import { BlobParse, BufferParse, CacheNever, JSONParse, TextParse } from "@pnp/queryable/index.js"; +import { + BlobParse, + BufferParse, + CacheNever, + JSONParse, + Queryable, + TextParse, + headers, + parseBinderWithErrorCheck, +} from "@pnp/queryable"; import { _SPInstance, SPQueryable } from "../spqueryable.js"; +export interface IResponseBodyStream { + body: ReadableStream; + knownLength: number; +} + +export function StreamParse(): TimelinePipe { + + return parseBinderWithErrorCheck(async r => ({ body: r.body, knownLength: parseInt(r?.headers?.get("content-length") || "-1", 10) })); +} + export class ReadableFile extends _SPInstance { /** @@ -34,6 +53,14 @@ export class ReadableFile extends _SPInstance { return this.getParsed(JSONParse()); } + /** + * Gets the content of a file as a ReadableStream + * + */ + public getStream(): Promise { + return SPQueryable(this, "$value").using(StreamParse(), CacheNever())(headers({ "binaryStringResponseBody": "true" })); + } + private getParsed(parser: TimelinePipe): Promise { return SPQueryable(this, "$value").using(parser, CacheNever())(); } diff --git a/packages/sp/files/types.ts b/packages/sp/files/types.ts index 474fda15d..020d6e9cf 100644 --- a/packages/sp/files/types.ts +++ b/packages/sp/files/types.ts @@ -1,5 +1,5 @@ import { body, cancelableScope, CancelAction } from "@pnp/queryable"; -import { getGUID, isFunc, stringIsNullOrEmpty, isUrlAbsolute, combine, noInherit } from "@pnp/core"; +import { getGUID, stringIsNullOrEmpty, isUrlAbsolute, combine, noInherit } from "@pnp/core"; import { _SPCollection, spInvokableFactory, @@ -24,6 +24,7 @@ import { IMoveCopyOptions } from "../types.js"; import { ReadableFile } from "./readable-file.js"; import "../context-info/index.js"; import { BatchNever } from "../batching.js"; +import { PassThrough, Stream } from "stream"; /** * Describes a collection of File objects @@ -86,23 +87,22 @@ export class _Files extends _SPCollection { * * @param url The folder-relative url of the file. * @param content The Blob file content to add - * @param progress A callback function which can be used to track the progress of the upload - * @param shouldOverWrite Should a file with the same name in the same location be overwritten? (default: true) - * @param chunkSize The size of each file slice, in bytes (default: 10485760) + * @param props Set of optional values that control the behavior of the underlying addUsingPath and chunkedUpload feature * @returns The new File and the raw response. */ @cancelableScope - public async addChunked(url: string, content: Blob, progress?: (data: IFileUploadProgressData) => void, shouldOverWrite = true, chunkSize = 10485760): Promise { + public async addChunked(url: string, content: ValidFileContentSource, props?: Partial & Partial): Promise { - const response = await spPost(Files(this, `add(overwrite=${shouldOverWrite},url='${encodePath(url)}')`)); + // add an empty stub + const response = await this.addUsingPath(url, null, props); - const file = fileFromServerRelativePath(this, response.ServerRelativeUrl); + const file = fileFromServerRelativePath(this, response.data.ServerRelativeUrl); file.using(CancelAction(() => { return File(file).delete(); })); - return file.setContentChunked(content, progress, chunkSize); + return file.setContentChunked(content, props); } /** @@ -404,95 +404,46 @@ export class _File extends ReadableFile { * @param chunkSize The size of each file slice, in bytes (default: 10485760) */ @cancelableScope - public async setContentChunked(file: Blob, progress?: (data: IFileUploadProgressData) => void, chunkSize = 10485760): Promise { + public async setContentChunked(file: ValidFileContentSource, props: Partial): Promise { - if (!isFunc(progress)) { - progress = () => null; - } + const { progress } = applyChunckedOperationDefaults(props); - const fileSize = file?.size || (file).length; - const totalBlocks = parseInt((fileSize / chunkSize).toString(), 10) + ((fileSize % chunkSize === 0) ? 1 : 0); const uploadId = getGUID(); + let first = true; + let chunk: { done: boolean; value?: any }; + let offset = 0; const fileRef = File(this).using(CancelAction(() => { return File(fileRef).cancelUpload(uploadId); })); - // report that we are starting - progress({ uploadId, blockNumber: 1, chunkSize, currentPointer: 0, fileSize, stage: "starting", totalBlocks }); - let currentPointer = await fileRef.startUpload(uploadId, file.slice(0, chunkSize)); + const contentStream = sourceToReadableStream(file); + const reader = contentStream.getReader(); - // skip the first and last blocks - for (let i = 2; i < totalBlocks; i++) { - progress({ uploadId, blockNumber: i, chunkSize, currentPointer, fileSize, stage: "continue", totalBlocks }); - currentPointer = await fileRef.continueUpload(uploadId, currentPointer, file.slice(currentPointer, currentPointer + chunkSize)); - } + while ((chunk = await reader.read())) { - progress({ uploadId, blockNumber: totalBlocks, chunkSize, currentPointer, fileSize, stage: "finishing", totalBlocks }); - return fileRef.finishUpload(uploadId, currentPointer, file.slice(currentPointer)); - } + if (chunk.done) { - /** - * Starts a new chunk upload session and uploads the first fragment. - * The current file content is not changed when this method completes. - * The method is idempotent (and therefore does not change the result) as long as you use the same values for uploadId and stream. - * The upload session ends either when you use the CancelUpload method or when you successfully - * complete the upload session by passing the rest of the file contents through the ContinueUpload and FinishUpload methods. - * The StartUpload and ContinueUpload methods return the size of the running total of uploaded data in bytes, - * so you can pass those return values to subsequent uses of ContinueUpload and FinishUpload. - * This method is currently available only on Office 365. - * - * @param uploadId The unique identifier of the upload session. - * @param fragment The file contents. - * @returns The size of the total uploaded data in bytes. - */ - protected async startUpload(uploadId: string, fragment: ArrayBuffer | Blob): Promise { - let n = await spPost(File(this, `startUpload(uploadId=guid'${uploadId}')`), { body: fragment }); - if (typeof n === "object") { - // When OData=verbose the payload has the following shape: - // { StartUpload: "10485760" } - n = (n as any).StartUpload; - } - return parseFloat(n); - } + progress({ offset, stage: "finishing", uploadId }); + const data = await spPost(File(fileRef, `finishUpload(uploadId=guid'${uploadId}',fileOffset=${offset})`), { body: chunk?.value || "" }); - /** - * Continues the chunk upload session with an additional fragment. - * The current file content is not changed. - * Use the uploadId value that was passed to the StartUpload method that started the upload session. - * This method is currently available only on Office 365. - * - * @param uploadId The unique identifier of the upload session. - * @param fileOffset The size of the offset into the file where the fragment starts. - * @param fragment The file contents. - * @returns The size of the total uploaded data in bytes. - */ - protected async continueUpload(uploadId: string, fileOffset: number, fragment: ArrayBuffer | Blob): Promise { - let n = await spPost(File(this, `continueUpload(uploadId=guid'${uploadId}',fileOffset=${fileOffset})`), { body: fragment }); - if (typeof n === "object") { - // When OData=verbose the payload has the following shape: - // { ContinueUpload: "20971520" } - n = (n as any).ContinueUpload; - } - return parseFloat(n); - } + return { + data, + file: fileFromServerRelativePath(this, data.ServerRelativeUrl), + }; - /** - * Uploads the last file fragment and commits the file. The current file content is changed when this method completes. - * Use the uploadId value that was passed to the StartUpload method that started the upload session. - * This method is currently available only on Office 365. - * - * @param uploadId The unique identifier of the upload session. - * @param fileOffset The size of the offset into the file where the fragment starts. - * @param fragment The file contents. - * @returns The newly uploaded file. - */ - protected async finishUpload(uploadId: string, fileOffset: number, fragment: ArrayBuffer | Blob): Promise { - const response: IFileInfo = await spPost(File(this, `finishUpload(uploadId=guid'${uploadId}',fileOffset=${fileOffset})`), { body: fragment }); - return { - data: response, - file: fileFromServerRelativePath(this, response.ServerRelativeUrl), - }; + } else if (first) { + + progress({ offset, stage: "starting", uploadId }); + offset = await spPost(File(fileRef, `startUpload(uploadId=guid'${uploadId}')`), { body: chunk.value }); + first = false; + + } else { + + progress({ offset, stage: "continue", uploadId }); + offset = await spPost(File(fileRef, `continueUpload(uploadId=guid'${uploadId}',fileOffset=${offset})`), { body: chunk.value }); + } + } } protected moveCopyImpl(destUrl: string, options: Partial, overwrite: boolean, methodName: string): Promise { @@ -696,11 +647,7 @@ export enum TemplateFileType { export interface IFileUploadProgressData { uploadId: string; stage: "starting" | "continue" | "finishing"; - blockNumber: number; - totalBlocks: number; - chunkSize: number; - currentPointer: number; - fileSize: number; + offset: number; } export interface IAddUsingPathProps { @@ -777,3 +724,67 @@ export interface IFileDeleteParams { */ ETagMatch: string; } + +export interface IChunkedOperationProps { + progress: (data: IFileUploadProgressData) => void; +} + +export type ValidFileContentSource = Blob | ReadableStream | TransformStream | Stream | PassThrough; + +function applyChunckedOperationDefaults(props: Partial): IChunkedOperationProps { + return { + progress: () => null, + ...props, + }; +} + +function sourceToReadableStream(source: ValidFileContentSource): ReadableStream { + + if (isBlob(source)) { + + return source.stream(); + + // eslint-disable-next-line @typescript-eslint/dot-notation + } else if (isPassThrough(source)) { + + // we probably have a passthrough stream from NodeFetch or some other type that supports "on(data)" + return new ReadableStream({ + start(controller) { + + source.on("data", (chunk) => { + controller.enqueue(chunk); + }); + + source.on("end", () => { + controller.close(); + }); + }, + }); + + } else { + + return source; + } +} + +const NAME = Symbol.toStringTag; + +function isPassThrough(object): object is PassThrough { + // eslint-disable-next-line @typescript-eslint/dot-notation + return typeof object["on"] === "function"; +} + +// FROM: node-fetch source code +function isBlob(object): object is Blob { + return ( + typeof object === "object" && + typeof object.arrayBuffer === "function" && + typeof object.type === "string" && + typeof object.stream === "function" && + typeof object.constructor === "function" && + ( + /^(Blob|File)$/.test(object[NAME]) || + /^(Blob|File)$/.test(object.constructor.name) + ) + ); +} diff --git a/tools/buildsystem/package-lock.json b/tools/buildsystem/package-lock.json index 9a437bc6d..9363097d1 100644 --- a/tools/buildsystem/package-lock.json +++ b/tools/buildsystem/package-lock.json @@ -1,15 +1,15 @@ { "name": "@pnp/buildsystem", - "version": "4.0.0-beta9", + "version": "4.0.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@pnp/buildsystem", - "version": "4.0.0-beta9", + "version": "4.0.1", "license": "MIT", "dependencies": { - "@pnp/core": "^4.0.0-alpha0-v4nightly.20240105", + "@pnp/core": "^4.0.0-alpha0-v4nightly.20240228", "globby": "^14.0.0", "liftoff": "^4.0.0", "webpack": "^5.89.0", @@ -153,9 +153,9 @@ } }, "node_modules/@pnp/core": { - "version": "4.0.0-alpha0-v4nightly.20240105", - "resolved": "https://registry.npmjs.org/@pnp/core/-/core-4.0.0-alpha0-v4nightly.20240105.tgz", - "integrity": "sha512-AdjLkyHZu5+ynwdxHwtD/hJzi6MIoSkmbh8MfIsf7FQ6hq/87YQhJjOKHPkt8nw2TUjHp7F4+nLoVUUouT/E9g==", + "version": "4.0.0-alpha0-v4nightly.20240228", + "resolved": "https://registry.npmjs.org/@pnp/core/-/core-4.0.0-alpha0-v4nightly.20240228.tgz", + "integrity": "sha512-MsTCCL5HXpsZ5m/ooMJiYJW6PoPrmVPOJfr9UJuchrS1WW2OmWe0HyM5IBhOn8VzhRTnuuELiOv4wD55xBpN0A==", "dependencies": { "tslib": "2.4.1" }, @@ -2749,9 +2749,9 @@ } }, "@pnp/core": { - "version": "4.0.0-alpha0-v4nightly.20240105", - "resolved": "https://registry.npmjs.org/@pnp/core/-/core-4.0.0-alpha0-v4nightly.20240105.tgz", - "integrity": "sha512-AdjLkyHZu5+ynwdxHwtD/hJzi6MIoSkmbh8MfIsf7FQ6hq/87YQhJjOKHPkt8nw2TUjHp7F4+nLoVUUouT/E9g==", + "version": "4.0.0-alpha0-v4nightly.20240228", + "resolved": "https://registry.npmjs.org/@pnp/core/-/core-4.0.0-alpha0-v4nightly.20240228.tgz", + "integrity": "sha512-MsTCCL5HXpsZ5m/ooMJiYJW6PoPrmVPOJfr9UJuchrS1WW2OmWe0HyM5IBhOn8VzhRTnuuELiOv4wD55xBpN0A==", "requires": { "tslib": "2.4.1" } diff --git a/tools/buildsystem/package.json b/tools/buildsystem/package.json index 4f1214930..3acffc603 100644 --- a/tools/buildsystem/package.json +++ b/tools/buildsystem/package.json @@ -9,7 +9,7 @@ "type": "module", "typings": "./index", "dependencies": { - "@pnp/core": "^4.0.0-alpha0-v4nightly.20240105", + "@pnp/core": "^4.0.0-alpha0-v4nightly.20240228", "globby": "^14.0.0", "liftoff": "^4.0.0", "webpack": "^5.89.0",