Skip to content

Commit

Permalink
Merge pull request #15 from DIG-Network/release/v0.0.1-alpha.15
Browse files Browse the repository at this point in the history
Release/v0.0.1 alpha.15
  • Loading branch information
MichaelTaylor3D authored Sep 10, 2024
2 parents 119cede + 36f1f13 commit eaaa87c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 38 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.15](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.14...v0.0.1-alpha.15) (2024-09-10)


### Bug Fixes

* write stream in tree ([b10d6a2](https://github.com/DIG-Network/dig-chia-sdk/commit/b10d6a2489fc66ee8c8c51546b0521f39aee3c24))

### [0.0.1-alpha.14](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.10...v0.0.1-alpha.14) (2024-09-10)


Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.14",
"version": "0.0.1-alpha.15",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
86 changes: 61 additions & 25 deletions src/DataIntegrityTree/DataIntegrityTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,46 +188,52 @@ class DataIntegrityTree {
if (!isHexString(key)) {
throw new Error(`key must be a valid hex string: ${key}`);
}

const uncompressedHash = crypto.createHash("sha256");
const gzip = zlib.createGzip();

let sha256: string;
const tempDir = path.join(this.storeDir, "tmp");
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true });
}
const tempFilePath = path.join(tempDir, `${crypto.randomUUID()}.gz`);

return new Promise((resolve, reject) => {
const tempWriteStream = fs.createWriteStream(tempFilePath);


// Update the hash with the original data
readStream.on("data", (chunk) => {
uncompressedHash.update(chunk);
});


// Pipe the read stream through gzip into the temporary write stream
readStream.pipe(gzip).pipe(tempWriteStream);

tempWriteStream.on("finish", async () => {
sha256 = uncompressedHash.digest("hex");

const finalWriteStream = this._createWriteStream(sha256);
const finalPath = finalWriteStream.path as string;

// Ensure the directory exists before copying the file
const finalDir = path.dirname(finalPath);
if (!fs.existsSync(finalDir)) {
fs.mkdirSync(finalDir, { recursive: true });
}

let finalWriteStream: fs.WriteStream | undefined;
try {
sha256 = uncompressedHash.digest("hex");

finalWriteStream = this._createWriteStream(sha256);
const finalPath = finalWriteStream.path as string;

// Ensure the directory exists
const finalDir = path.dirname(finalPath);
if (!fs.existsSync(finalDir)) {
fs.mkdirSync(finalDir, { recursive: true });
}

// Copy the temporary gzipped file to the final destination
await this._streamFile(tempFilePath, finalPath);
await unlink(tempFilePath);

await unlink(tempFilePath); // Clean up the temporary file
const combinedHash = crypto
.createHash("sha256")
.update(`${key}/${sha256}`)
.digest("hex");


// Check if the key already exists with the same hash
if (
Array.from(this.files.values()).some(
(file) => file.hash === combinedHash
Expand All @@ -236,28 +242,58 @@ class DataIntegrityTree {
console.log(`No changes detected for key: ${key}`);
return resolve();
}


// Delete existing key if present
if (this.files.has(key)) {
this.deleteKey(key);
}


// Insert the new key with the hash
console.log(`Inserted key: ${key}`);
this.files.set(key, {
hash: combinedHash,
sha256: sha256,
});

this._rebuildTree();
resolve();
} catch (err) {
// On error, cleanup the temporary file and reject
await unlink(tempFilePath).catch(() => {});
reject(err);
} finally {
// Always close the final write stream if it exists
if (finalWriteStream) {
finalWriteStream.end();
}
}
});

tempWriteStream.on("error", (err) => {

tempWriteStream.on("error", async (err) => {
// Close streams and clean up in case of error
tempWriteStream.destroy();
gzip.destroy();
readStream.destroy();

await unlink(tempFilePath).catch(() => {}); // Clean up the temp file
reject(err);
});

readStream.on("error", (err) => {

readStream.on("error", async (err) => {
// Close streams and clean up in case of error
tempWriteStream.destroy();
gzip.destroy();
readStream.destroy();

await unlink(tempFilePath).catch(() => {}); // Clean up the temp file
reject(err);
});

gzip.on("error", (err) => {
// Handle errors in the gzip stream
tempWriteStream.destroy();
gzip.destroy();
readStream.destroy();
reject(err);
});
});
Expand Down
29 changes: 19 additions & 10 deletions src/utils/directoryUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import ignore from "ignore";
import { DataIntegrityTree } from "../DataIntegrityTree";

// Custom concurrency handler
const limitConcurrency = async (concurrencyLimit: number, tasks: (() => Promise<void>)[]) => {
const limitConcurrency = async (
concurrencyLimit: number,
tasks: (() => Promise<void>)[]
) => {
const results = [];
const executing: Promise<void>[] = [];

Expand Down Expand Up @@ -61,16 +64,22 @@ export const addDirectory = async (
tasks.push(() => addDirectory(datalayer, filePath, baseDir));
} else {
// Add a task for each file to be processed
tasks.push(() =>
new Promise<void>((resolve, reject) => {
const stream = fs.createReadStream(filePath);
datalayer
.upsertKey(stream, Buffer.from(relativePath).toString("hex"))
.then(resolve)
.catch(reject);
})
tasks.push(
() =>
new Promise<void>((resolve, reject) => {
const stream = fs.createReadStream(filePath);
datalayer
.upsertKey(stream, Buffer.from(relativePath).toString("hex"))
.then(async () => {
await new Promise<void>((resolve) => setTimeout(resolve, 100));
resolve();
})
.catch(reject);
})
);
}

await new Promise<void>((resolve) => setTimeout(resolve, 100));
}

// Run tasks with limited concurrency (set the concurrency limit as needed)
Expand Down Expand Up @@ -99,4 +108,4 @@ export const calculateFolderSize = (folderPath: string): bigint => {
}

return totalSize;
};
};

0 comments on commit eaaa87c

Please sign in to comment.