diff --git a/CHANGELOG.md b/CHANGELOG.md index dfdd0c0..36d7e9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.15](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.14...v0.0.1-alpha.15) (2024-09-10) + + +### Bug Fixes + +* write stream in tree ([b10d6a2](https://github.com/DIG-Network/dig-chia-sdk/commit/b10d6a2489fc66ee8c8c51546b0521f39aee3c24)) + ### [0.0.1-alpha.14](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.10...v0.0.1-alpha.14) (2024-09-10) diff --git a/package-lock.json b/package-lock.json index 3f19a8f..896bd95 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.14", + "version": "0.0.1-alpha.15", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.14", + "version": "0.0.1-alpha.15", "license": "ISC", "dependencies": { "bip39": "^3.1.0", diff --git a/package.json b/package.json index aa01262..e176f75 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.14", + "version": "0.0.1-alpha.15", "description": "", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/DataIntegrityTree/DataIntegrityTree.ts b/src/DataIntegrityTree/DataIntegrityTree.ts index b618ad0..b9c4e60 100644 --- a/src/DataIntegrityTree/DataIntegrityTree.ts +++ b/src/DataIntegrityTree/DataIntegrityTree.ts @@ -188,46 +188,52 @@ class DataIntegrityTree { if (!isHexString(key)) { throw new Error(`key must be a valid hex string: ${key}`); } + const uncompressedHash = crypto.createHash("sha256"); const gzip = zlib.createGzip(); - + let sha256: string; const tempDir = path.join(this.storeDir, "tmp"); if (!fs.existsSync(tempDir)) { fs.mkdirSync(tempDir, { recursive: true }); } const tempFilePath = path.join(tempDir, `${crypto.randomUUID()}.gz`); - + return new Promise((resolve, reject) => { const tempWriteStream = fs.createWriteStream(tempFilePath); - + + // Update the hash with the original data readStream.on("data", (chunk) => { uncompressedHash.update(chunk); }); - + + // Pipe the read stream through gzip into the temporary write stream readStream.pipe(gzip).pipe(tempWriteStream); - + tempWriteStream.on("finish", async () => { - sha256 = uncompressedHash.digest("hex"); - - const finalWriteStream = this._createWriteStream(sha256); - const finalPath = finalWriteStream.path as string; - - // Ensure the directory exists before copying the file - const finalDir = path.dirname(finalPath); - if (!fs.existsSync(finalDir)) { - fs.mkdirSync(finalDir, { recursive: true }); - } - + let finalWriteStream: fs.WriteStream | undefined; try { + sha256 = uncompressedHash.digest("hex"); + + finalWriteStream = this._createWriteStream(sha256); + const finalPath = finalWriteStream.path as string; + + // Ensure the directory exists + const finalDir = path.dirname(finalPath); + if (!fs.existsSync(finalDir)) { + fs.mkdirSync(finalDir, { recursive: true }); + } + + // Copy the temporary gzipped file to the final destination await this._streamFile(tempFilePath, finalPath); - await unlink(tempFilePath); - + await unlink(tempFilePath); // Clean up the temporary file + const combinedHash = crypto .createHash("sha256") .update(`${key}/${sha256}`) .digest("hex"); - + + // Check if the key already exists with the same hash if ( Array.from(this.files.values()).some( (file) => file.hash === combinedHash @@ -236,28 +242,58 @@ class DataIntegrityTree { console.log(`No changes detected for key: ${key}`); return resolve(); } - + + // Delete existing key if present if (this.files.has(key)) { this.deleteKey(key); } - + + // Insert the new key with the hash console.log(`Inserted key: ${key}`); this.files.set(key, { hash: combinedHash, sha256: sha256, }); + this._rebuildTree(); resolve(); } catch (err) { + // On error, cleanup the temporary file and reject + await unlink(tempFilePath).catch(() => {}); reject(err); + } finally { + // Always close the final write stream if it exists + if (finalWriteStream) { + finalWriteStream.end(); + } } }); - - tempWriteStream.on("error", (err) => { + + tempWriteStream.on("error", async (err) => { + // Close streams and clean up in case of error + tempWriteStream.destroy(); + gzip.destroy(); + readStream.destroy(); + + await unlink(tempFilePath).catch(() => {}); // Clean up the temp file reject(err); }); - - readStream.on("error", (err) => { + + readStream.on("error", async (err) => { + // Close streams and clean up in case of error + tempWriteStream.destroy(); + gzip.destroy(); + readStream.destroy(); + + await unlink(tempFilePath).catch(() => {}); // Clean up the temp file + reject(err); + }); + + gzip.on("error", (err) => { + // Handle errors in the gzip stream + tempWriteStream.destroy(); + gzip.destroy(); + readStream.destroy(); reject(err); }); }); diff --git a/src/utils/directoryUtils.ts b/src/utils/directoryUtils.ts index eea4852..3a9b7d9 100644 --- a/src/utils/directoryUtils.ts +++ b/src/utils/directoryUtils.ts @@ -4,7 +4,10 @@ import ignore from "ignore"; import { DataIntegrityTree } from "../DataIntegrityTree"; // Custom concurrency handler -const limitConcurrency = async (concurrencyLimit: number, tasks: (() => Promise)[]) => { +const limitConcurrency = async ( + concurrencyLimit: number, + tasks: (() => Promise)[] +) => { const results = []; const executing: Promise[] = []; @@ -61,16 +64,22 @@ export const addDirectory = async ( tasks.push(() => addDirectory(datalayer, filePath, baseDir)); } else { // Add a task for each file to be processed - tasks.push(() => - new Promise((resolve, reject) => { - const stream = fs.createReadStream(filePath); - datalayer - .upsertKey(stream, Buffer.from(relativePath).toString("hex")) - .then(resolve) - .catch(reject); - }) + tasks.push( + () => + new Promise((resolve, reject) => { + const stream = fs.createReadStream(filePath); + datalayer + .upsertKey(stream, Buffer.from(relativePath).toString("hex")) + .then(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + resolve(); + }) + .catch(reject); + }) ); } + + await new Promise((resolve) => setTimeout(resolve, 100)); } // Run tasks with limited concurrency (set the concurrency limit as needed) @@ -99,4 +108,4 @@ export const calculateFolderSize = (folderPath: string): bigint => { } return totalSize; -}; \ No newline at end of file +};