From e9beb231dbedf0392a806dba6cc9d225745f8ee9 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Thu, 26 Sep 2024 15:44:54 -0400 Subject: [PATCH 1/4] feat: add inactivity timer to propagation server downloads --- src/DigNetwork/PropagationServer.ts | 83 ++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/src/DigNetwork/PropagationServer.ts b/src/DigNetwork/PropagationServer.ts index 06135aa..891aa62 100644 --- a/src/DigNetwork/PropagationServer.ts +++ b/src/DigNetwork/PropagationServer.ts @@ -1,23 +1,23 @@ -// Import necessary modules and dependencies import axios, { AxiosRequestConfig } from "axios"; -import fs from "fs"; -import path from "path"; +import cliProgress from "cli-progress"; import FormData from "form-data"; -import { Wallet, DataStore } from "../blockchain"; -import { getOrCreateSSLCerts } from "../utils/ssl"; -import { promptCredentials } from "../utils/credentialsUtils"; +import fs from "fs"; +import fsExtra from "fs-extra"; import https from "https"; -import cliProgress from "cli-progress"; -import { green, red, blue, yellow, cyan } from "colorette"; -import { STORE_PATH } from "../utils/config"; -import { getFilePathFromSha256 } from "../utils/hashUtils"; -import { createSpinner } from "nanospinner"; +import os from "os"; +import path from "path"; import ProgressStream from "progress-stream"; -import { PassThrough } from "stream"; + import { asyncPool } from "../utils/asyncPool"; -import fsExtra from "fs-extra"; -import os from "os"; +import { createSpinner } from "nanospinner"; +import { getFilePathFromSha256 } from "../utils/hashUtils"; +import { getOrCreateSSLCerts } from "../utils/ssl"; +import { green, red, blue, yellow, cyan } from "colorette"; import { merkleIntegrityCheck } from "../utils/merkle"; +import { PassThrough } from "stream"; +import { promptCredentials } from "../utils/credentialsUtils"; +import { STORE_PATH } from "../utils/config"; +import { Wallet, DataStore } from "../blockchain"; // Helper function to trim long filenames with ellipsis and ensure consistent padding function formatFilename(filename: string | undefined, maxLength = 30): string { @@ -43,6 +43,7 @@ export class PropagationServer { password: string | undefined; private static readonly port = 4159; // Static port used for all requests + private static readonly inactivityTimeout = 5000; // Inactivity timeout in milliseconds (5 seconds) constructor(ipAddress: string, storeId: string) { this.storeId = storeId; @@ -77,6 +78,34 @@ export class PropagationServer { }); } + /** + * Adds a custom inactivity timeout for large file transfers. + */ + addInactivityTimeout(stream: PassThrough, timeoutMs: number) { + let timeoutId: NodeJS.Timeout | undefined; + + const resetTimeout = () => { + clearTimeout(timeoutId); + timeoutId = setTimeout(() => { + stream.destroy( + new Error(`Inactivity timeout after ${timeoutMs / 1000} seconds`) + ); + }, timeoutMs); + }; + + // Reset timeout every time data is received + stream.on("data", resetTimeout); + + // Set the initial timeout + resetTimeout(); + + // Clear the timeout when the stream ends + stream.on("end", () => clearTimeout(timeoutId)); + stream.on("error", () => clearTimeout(timeoutId)); // Handle errors + + return stream; + } + /** * Check if the store and optional root hash exist by making a HEAD request. */ @@ -270,8 +299,11 @@ export class PropagationServer { progressBar!.update(progress.transferred); }); - // Create a PassThrough stream - const passThroughStream = new PassThrough(); + // Add inactivity timeout to the progress stream + const passThroughStream = this.addInactivityTimeout( + new PassThrough(), + PropagationServer.inactivityTimeout + ); // Pipe the read stream through the progress stream into the PassThrough stream fileReadStream.pipe(progressStream).pipe(passThroughStream); @@ -472,7 +504,13 @@ export class PropagationServer { progressBar!.update(progress.transferred); }); - response.data.pipe(progressStream); + // Add inactivity timeout to the progress stream + const passThroughStream = this.addInactivityTimeout( + new PassThrough(), + PropagationServer.inactivityTimeout + ); + + response.data.pipe(progressStream).pipe(passThroughStream); progressStream.on("data", (chunk: Buffer) => { dataBuffers.push(chunk); @@ -592,7 +630,16 @@ export class PropagationServer { progressBar!.update(progress.transferred); }); - response.data.pipe(progressStream).pipe(fileWriteStream); + // Add inactivity timeout to the progress stream + const passThroughStream = this.addInactivityTimeout( + new PassThrough(), + PropagationServer.inactivityTimeout + ); + + response.data + .pipe(progressStream) + .pipe(passThroughStream) + .pipe(fileWriteStream); // Wait for both the file write stream and the progress stream to finish await Promise.all([ From 12c2a44e2d36e4daaea1d036fb98c71a7cbd7637 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Thu, 26 Sep 2024 15:53:14 -0400 Subject: [PATCH 2/4] feat: refactor content server driver to use axios --- src/DigNetwork/ContentServer.ts | 326 +++++++------------------------- 1 file changed, 71 insertions(+), 255 deletions(-) diff --git a/src/DigNetwork/ContentServer.ts b/src/DigNetwork/ContentServer.ts index 8241c81..08a5490 100644 --- a/src/DigNetwork/ContentServer.ts +++ b/src/DigNetwork/ContentServer.ts @@ -1,7 +1,7 @@ +import axios, { AxiosRequestConfig } from "axios"; import fs from "fs"; -import http from "http"; +import https from "https"; import { URL } from "url"; -import { Readable } from "stream"; import { getOrCreateSSLCerts } from "../utils/ssl"; export class ContentServer { @@ -10,6 +10,7 @@ export class ContentServer { private static certPath: string; private static keyPath: string; private static readonly port = 4161; + private static readonly inactivityTimeout = 5000; // Inactivity timeout in milliseconds (5 seconds) constructor(ipAddress: string, storeId: string) { this.ipAddress = ipAddress; @@ -28,21 +29,18 @@ export class ContentServer { rootHash: string, challengeHex?: string ): Promise { - // Construct the base URL let url = `https://${this.ipAddress}:${ContentServer.port}/chia.${this.storeId}.${rootHash}/${key}`; - // If a challenge is provided, append it as a query parameter if (challengeHex) { url += `?challenge=${challengeHex}`; } - return this.fetchWithRetries(url); + return this.fetch(url); } // Method to get the payment address from the peer public async getPaymentAddress(): Promise { console.log(`Fetching payment address from peer ${this.ipAddress}...`); - try { const wellKnown = await this.getWellKnown(); return wellKnown.xch_address; @@ -75,11 +73,9 @@ export class ContentServer { // Method to get the index of keys in a store public async getKeysIndex(rootHash?: string): Promise { let udi = `chia.${this.storeId}`; - if (rootHash) { udi += `.${rootHash}`; } - const url = `https://${this.ipAddress}:${ContentServer.port}/${udi}`; return this.fetchJson(url); } @@ -88,13 +84,11 @@ export class ContentServer { public async headKey( key: string, rootHash?: string - ): Promise<{ success: boolean; headers?: http.IncomingHttpHeaders }> { + ): Promise<{ success: boolean; headers?: any }> { let udi = `chia.${this.storeId}`; - if (rootHash) { udi += `.${rootHash}`; } - const url = `https://${this.ipAddress}:${ContentServer.port}/${udi}/${key}`; return this.head(url); } @@ -102,14 +96,12 @@ export class ContentServer { // Method to check if a specific store exists (HEAD request) public async headStore(options?: { hasRootHash: string }): Promise<{ success: boolean; - headers?: http.IncomingHttpHeaders; + headers?: any; }> { let url = `https://${this.ipAddress}:${ContentServer.port}/chia.${this.storeId}`; - if (options?.hasRootHash) { url += `?hasRootHash=${options.hasRootHash}`; } - return this.head(url); } @@ -123,274 +115,98 @@ export class ContentServer { return false; } - public streamKey(key: string, rootHash?: string): Promise { - let udi = `chia.${this.storeId}`; - - if (rootHash) { - udi += `.${rootHash}`; - } - - return new Promise((resolve, reject) => { - const url = `https://${this.ipAddress}:${ContentServer.port}/${udi}/${key}`; - const urlObj = new URL(url); - - const requestOptions = { - hostname: urlObj.hostname, - port: urlObj.port || ContentServer.port, - path: urlObj.pathname + urlObj.search, - method: "GET", + // Helper method to perform HEAD requests using axios + private async head(url: string): Promise<{ success: boolean; headers?: any }> { + try { + const config = this.getAxiosConfig(); + const response = await axios.head(url, config); + return { + success: response.status >= 200 && response.status < 300, + headers: response.headers, }; - - const request = http.request(requestOptions, (response) => { - if (response.statusCode === 200) { - resolve(response); // Resolve with the readable stream - } else if (response.statusCode === 301 || response.statusCode === 302) { - // Handle redirects - const redirectUrl = response.headers.location; - if (redirectUrl) { - this.streamKey(redirectUrl).then(resolve).catch(reject); - } else { - reject(new Error("Redirected without a location header")); - } - } else { - reject( - new Error( - `Failed to retrieve data from ${url}. Status code: ${response.statusCode}` - ) - ); - } - }); - - request.on("error", (error) => { - console.error(`GET Request error for ${url}:`, error); - reject(error); - }); - - request.end(); - }); - } - - // Helper method to perform HEAD requests - private async head( - url: string, - maxRedirects: number = 5 - ): Promise<{ success: boolean; headers?: http.IncomingHttpHeaders }> { - return new Promise((resolve, reject) => { - try { - // Parse the input URL - const urlObj = new URL(url); - - const requestOptions = { - hostname: urlObj.hostname, - port: - urlObj.port || - (urlObj.protocol === "http:" ? 80 : ContentServer.port), - path: urlObj.pathname + urlObj.search, - method: "HEAD", - key: fs.readFileSync(ContentServer.keyPath), - cert: fs.readFileSync(ContentServer.certPath), - rejectUnauthorized: false, - }; - - const request = http.request(requestOptions, (response) => { - const { statusCode, headers } = response; - - // If status code is 2xx, return success - if (statusCode && statusCode >= 200 && statusCode < 300) { - resolve({ success: true, headers }); - } - // Handle 3xx redirection - else if ( - statusCode && - statusCode >= 300 && - statusCode < 400 && - headers.location - ) { - if (maxRedirects > 0) { - let redirectUrl = headers.location; - - // Check if the redirect URL is relative - if (!/^https?:\/\//i.test(redirectUrl)) { - // Resolve the relative URL based on the original URL - redirectUrl = new URL(redirectUrl, url).toString(); - console.log(`Resolved relative redirect to: ${redirectUrl}`); - } else { - console.log(`Redirecting to: ${redirectUrl}`); - } - - // Recursively follow the redirection - this.head(redirectUrl, maxRedirects - 1) - .then(resolve) - .catch(reject); - } else { - reject({ success: false, message: "Too many redirects" }); - } - } else { - // For other status codes, consider it a failure - resolve({ success: false }); - } - }); - - request.on("error", (error) => { - console.error(`HEAD ${url}:`, error.message); - reject({ success: false }); - }); - - request.end(); - } catch (err) { - console.error(`Invalid URL: ${url}`, err); - reject({ success: false, message: "Invalid URL" }); - } - }); + } catch (error) { + return { success: false }; + } } // Helper method to fetch JSON data from a URL private async fetchJson(url: string): Promise { - const response = await this.fetchWithRetries(url); + const response = await this.fetch(url); return JSON.parse(response); } - // Helper method to fetch content with retries and redirection handling - private async fetchWithRetries(url: string): Promise { - let attempt = 0; - const maxRetries = 1; - const initialDelay = 2000; // 2 seconds - const maxDelay = 10000; // 10 seconds - const delayMultiplier = 1.5; - let delay = initialDelay; + // Core method to fetch content from a URL with a 5-second inactivity timeout, handling redirects + private async fetch(url: string): Promise { + const config = this.getAxiosConfig(); - while (attempt < maxRetries) { - try { - return await this.fetch(url); - } catch (error: any) { - if (attempt < maxRetries - 1) { - console.warn( - `Attempt ${attempt + 1} failed: ${error.message}. Retrying in ${ - delay / 1000 - } seconds...` - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - delay = Math.min(maxDelay, delay * delayMultiplier); - } else { - console.error(`Failed to retrieve data from ${url}. Aborting.`); - throw new Error(`Failed to retrieve data: ${error.message}`); - } - } - attempt++; - } - throw new Error( - `Failed to retrieve data from ${url} after ${maxRetries} attempts.` - ); - } + // Create a cancel token for the inactivity timeout + const source = axios.CancelToken.source(); - // Core method to fetch content from a URL with a 5-second inactivity timeout - private async fetch(url: string, maxRedirects: number = 5): Promise { return new Promise((resolve, reject) => { - const urlObj = new URL(url); - const timeoutDuration = 5000; // 5 seconds - - let timeout: NodeJS.Timeout | null = null; // Initialize timeout + let timeout: NodeJS.Timeout | null = null; - const requestOptions = { - hostname: urlObj.hostname, - port: urlObj.port || ContentServer.port, - path: urlObj.pathname + urlObj.search, // Include query params - method: "GET", - key: fs.readFileSync(ContentServer.keyPath), - cert: fs.readFileSync(ContentServer.certPath), - rejectUnauthorized: false, - }; - - const request = http.request(requestOptions, (response) => { - let data = ""; - - // Set timeout for inactivity + const resetTimeout = () => { + if (timeout) { + clearTimeout(timeout); + } timeout = setTimeout(() => { - console.error( - `Request timeout: No data received for ${ - timeoutDuration / 1000 - } seconds.` + source.cancel( + `Request timed out after ${ + ContentServer.inactivityTimeout / 1000 + } seconds of inactivity` ); - request.destroy(); // Use destroy instead of abort - reject( - new Error( - `Request timed out after ${ - timeoutDuration / 1000 - } seconds of inactivity` - ) - ); - }, timeoutDuration); - - const resetTimeout = () => { - if (timeout) { - clearTimeout(timeout); - } - timeout = setTimeout(() => { - console.error( - `Request timeout: No data received for ${ - timeoutDuration / 1000 - } seconds.` - ); - request.destroy(); // Use destroy instead of abort - reject( - new Error( - `Request timed out after ${ - timeoutDuration / 1000 - } seconds of inactivity` - ) - ); - }, timeoutDuration); - }; + }, ContentServer.inactivityTimeout); + }; - if (response.statusCode === 200) { - response.on("data", (chunk) => { + axios + .get(url, { + ...config, + cancelToken: source.token, + responseType: "stream", // Use stream to track data transfer + maxRedirects: 5, // Axios follows up to 5 redirects by default + }) + .then((response) => { + let data = ""; + + response.data.on("data", (chunk: any) => { data += chunk; - resetTimeout(); // Reset the timeout every time data is received + resetTimeout(); // Reset the timeout when data is received }); - response.on("end", () => { + response.data.on("end", () => { if (timeout) { - clearTimeout(timeout); + clearTimeout(timeout); // Clear timeout on stream end } resolve(data); }); - } else if ( - (response.statusCode === 301 || response.statusCode === 302) && - response.headers.location - ) { - // Handle redirects - if (maxRedirects > 0) { - const redirectUrl = new URL(response.headers.location, url); // Resolve relative URLs based on the original URL + + response.data.on("error", (error: any) => { if (timeout) { clearTimeout(timeout); } - this.fetch(redirectUrl.toString(), maxRedirects - 1) - .then(resolve) - .catch(reject); + reject(error); + }); + }) + .catch((error: any) => { + if (axios.isCancel(error)) { + console.error("Request canceled:", error.message); } else { - reject(new Error("Too many redirects")); + console.error(`Failed to retrieve data from ${url}:`, error.message); } - } else { - if (timeout) { - clearTimeout(timeout); - } - reject( - new Error( - `Failed to retrieve data from ${url}. Status code: ${response.statusCode}` - ) - ); - } - }); - - request.on("error", (error: NodeJS.ErrnoException) => { - if (timeout) { - clearTimeout(timeout); - } - console.error(`GET ${url}:`, error.message); - reject(error); - }); - - request.end(); + reject(error); + }); }); } + + // Helper method to configure axios with HTTPS settings + private getAxiosConfig(): AxiosRequestConfig { + return { + httpsAgent: new https.Agent({ + cert: fs.readFileSync(ContentServer.certPath), + key: fs.readFileSync(ContentServer.keyPath), + rejectUnauthorized: false, + }), + timeout: 0, // Disable axios timeout (we're using inactivity timeout) + }; + } } From ffd0c596d2ca29a910443b1d09a56db83d8d80e6 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Thu, 26 Sep 2024 16:01:08 -0400 Subject: [PATCH 3/4] feat: add public key to server coin --- src/blockchain/ServerCoin.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/blockchain/ServerCoin.ts b/src/blockchain/ServerCoin.ts index 016de0d..04f8495 100644 --- a/src/blockchain/ServerCoin.ts +++ b/src/blockchain/ServerCoin.ts @@ -55,7 +55,7 @@ export class ServerCoin { publicSyntheticKey, serverCoinCreationCoins, epochBasedHint, - [peerIp], + [peerIp, publicSyntheticKey.toString("hex")], BigInt(serverCoinCollateral), BigInt(1000000) ); @@ -223,6 +223,10 @@ export class ServerCoin { for (const coinState of filteredCoinStates) { const serverCoin = await peer.fetchServerCoin(coinState, maxClvmCost); const peerUrl = serverCoin.memoUrls[0]; + // The second memo URL is the public key + // We will utilize this for future features + const publicKey = serverCoin.memoUrls[1]; + if (!blacklist.includes(peerUrl)) { serverCoinPeers.add(peerUrl); } From 490423e3a743fc980dee20bb83c35834ce93dfb4 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Thu, 26 Sep 2024 16:01:44 -0400 Subject: [PATCH 4/4] chore(release): 0.0.1-alpha.99 --- CHANGELOG.md | 9 +++++++++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a03c83..987bc6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.99](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.98...v0.0.1-alpha.99) (2024-09-26) + + +### Features + +* add inactivity timer to propagation server downloads ([e9beb23](https://github.com/DIG-Network/dig-chia-sdk/commit/e9beb231dbedf0392a806dba6cc9d225745f8ee9)) +* add public key to server coin ([ffd0c59](https://github.com/DIG-Network/dig-chia-sdk/commit/ffd0c596d2ca29a910443b1d09a56db83d8d80e6)) +* refactor content server driver to use axios ([12c2a44](https://github.com/DIG-Network/dig-chia-sdk/commit/12c2a44e2d36e4daaea1d036fb98c71a7cbd7637)) + ### [0.0.1-alpha.98](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.97...v0.0.1-alpha.98) (2024-09-26) diff --git a/package-lock.json b/package-lock.json index beacc39..7ae5031 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.98", + "version": "0.0.1-alpha.99", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.98", + "version": "0.0.1-alpha.99", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.25", diff --git a/package.json b/package.json index f90a195..c421bba 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.98", + "version": "0.0.1-alpha.99", "description": "", "type": "commonjs", "main": "./dist/index.js",