diff --git a/CHANGELOG.md b/CHANGELOG.md index 37aaf39..189d6c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.136](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.135...v0.0.1-alpha.136) (2024-10-05) + + +### Features + +* more advanced fullnode throttle logic ([5cd3f5a](https://github.com/DIG-Network/dig-chia-sdk/commit/5cd3f5aff442a9e8570c257d0d35d2f998820116)) + ### [0.0.1-alpha.135](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.134...v0.0.1-alpha.135) (2024-10-05) diff --git a/package-lock.json b/package-lock.json index 190f8f8..d7c0553 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.135", + "version": "0.0.1-alpha.136", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.135", + "version": "0.0.1-alpha.136", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.29", diff --git a/package.json b/package.json index e4f26ea..02adc4d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.135", + "version": "0.0.1-alpha.136", "description": "", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/blockchain/FullNodePeer.ts b/src/blockchain/FullNodePeer.ts index 3762eb0..2fbdb0e 100644 --- a/src/blockchain/FullNodePeer.ts +++ b/src/blockchain/FullNodePeer.ts @@ -25,25 +25,17 @@ const CACHE_DURATION = 30000; // in milliseconds const COOLDOWN_DURATION = 300000; // 5 minutes in milliseconds const MAX_PEERS_TO_FETCH = 5; // Maximum number of peers to fetch from DNS const MAX_RETRIES = 3; // Maximum number of retry attempts -const MAX_REQUESTS_PER_MINUTE = 100; // Throttle limit +const MAX_REQUESTS_PER_MINUTE = 100; // Per-peer rate limit /** - * Represents a peer with its reliability weight and address. + * Represents a peer with its reliability weight, address, and rate limiter. */ interface PeerInfo { peer: Peer; weight: number; address: string; isConnected: boolean; // Indicates if the peer is currently connected -} - -/** - * Represents a queued method call. - */ -interface QueuedCall { - execute: () => Promise; - resolve: (value: any) => void; - reject: (reason?: any) => void; + limiter: Bottleneck; // Rate limiter for the peer } /** @@ -71,11 +63,11 @@ export class FullNodePeer { // Cache for fetched peer IPs private static peerIPCache = new NodeCache({ stdTTL: CACHE_DURATION / 1000 }); - // Bottleneck instance for global throttling - private static limiter = new Bottleneck({ - maxConcurrent: 1, // Ensures only one request is processed at a time - minTime: 60000 / MAX_REQUESTS_PER_MINUTE, // Calculated delay between requests - }); + // List of available peers for round-robin + private static availablePeers: string[] = []; + + // Current index for round-robin selection + private static currentPeerIndex: number = 0; // Private constructor for singleton pattern private constructor(private peer: Peer) {} @@ -302,32 +294,16 @@ export class FullNodePeer { } /** - * Selects a peer based on weighted random selection. - * Prioritized peers have higher weights. + * Selects the next peer based on round-robin selection. * @returns {string} The selected peer IP. */ - private static selectPeerByWeight(): string { - const peers = Array.from(FullNodePeer.peerWeights.entries()) - .filter(([ip, _]) => !FullNodePeer.cooldownCache.has(ip)) - .map(([ip, weight]) => ({ ip, weight })); - - const totalWeight = peers.reduce((sum, peer) => sum + peer.weight, 0); - if (totalWeight === 0) { - throw new Error("All peers are in cooldown."); + private static getNextPeerIP(): string { + if (FullNodePeer.availablePeers.length === 0) { + throw new Error("No available peers to select."); } - - const random = Math.random() * totalWeight; - let cumulative = 0; - - for (const peer of peers) { - cumulative += peer.weight; - if (random < cumulative) { - return peer.ip; - } - } - - // Fallback - return peers[peers.length - 1].ip; + const peerIP = FullNodePeer.availablePeers[FullNodePeer.currentPeerIndex]; + FullNodePeer.currentPeerIndex = (FullNodePeer.currentPeerIndex + 1) % FullNodePeer.availablePeers.length; + return peerIP; } /** @@ -348,7 +324,7 @@ export class FullNodePeer { } /** - * Connects to the best available peer based on weighted selection and reliability. + * Connects to the best available peer based on round-robin selection and reliability. * @returns {Promise} The connected Peer instance. */ private static async getBestPeer(): Promise { @@ -369,65 +345,119 @@ export class FullNodePeer { // Setup peer weights with prioritization FullNodePeer.setupPeers(peerIPs); - // Weighted random selection - let selectedPeerIP: string; - try { - selectedPeerIP = FullNodePeer.selectPeerByWeight(); - } catch (error: any) { - throw new Error(`Failed to select a peer: ${error.message}`); - } + // Initialize or update peerInfos and availablePeers + for (const ip of peerIPs) { + if (!FullNodePeer.peerInfos.has(ip)) { + // Create a new Bottleneck limiter for the peer + const limiter = new Bottleneck({ + maxConcurrent: 1, // One request at a time per peer + minTime: 60000 / MAX_REQUESTS_PER_MINUTE, // 600 ms between requests for 100 requests/min + }); - // Attempt to create a peer connection - const sslFolder = path.resolve(os.homedir(), ".dig", "ssl"); - const certFile = path.join(sslFolder, "public_dig.crt"); - const keyFile = path.join(sslFolder, "public_dig.key"); + // Attempt to create a peer connection + const sslFolder = path.resolve(os.homedir(), ".dig", "ssl"); + const certFile = path.join(sslFolder, "public_dig.crt"); + const keyFile = path.join(sslFolder, "public_dig.key"); - if (!fs.existsSync(sslFolder)) { - fs.mkdirSync(sslFolder, { recursive: true }); - } + if (!fs.existsSync(sslFolder)) { + fs.mkdirSync(sslFolder, { recursive: true }); + } - const tls = new Tls(certFile, keyFile); + const tls = new Tls(certFile, keyFile); + + let peer: Peer; + try { + peer = await Peer.new(`${ip}:${FULLNODE_PORT}`, false, tls); + } catch (error: any) { + console.error(`Failed to create peer for IP ${ip}: ${error.message}`); + // Add to cooldown + FullNodePeer.cooldownCache.set(ip, true); + // Decrease weight or remove peer + const currentWeight = FullNodePeer.peerWeights.get(ip) || 1; + if (currentWeight > 1) { + FullNodePeer.peerWeights.set(ip, currentWeight - 1); + } else { + FullNodePeer.peerWeights.delete(ip); + } + continue; // Skip adding this peer + } - let peer: Peer; - try { - peer = await Peer.new(`${selectedPeerIP}:${FULLNODE_PORT}`, false, tls); - } catch (error: any) { - console.error( - `Failed to create peer for IP ${selectedPeerIP}: ${error.message}` - ); - // Add to cooldown - FullNodePeer.cooldownCache.set(selectedPeerIP, true); - // Decrease weight or remove peer - const currentWeight = FullNodePeer.peerWeights.get(selectedPeerIP) || 1; - if (currentWeight > 1) { - FullNodePeer.peerWeights.set(selectedPeerIP, currentWeight - 1); + // Wrap the peer with proxy to handle errors and retries + const proxiedPeer = FullNodePeer.createPeerProxy(peer, ip); + + // Store PeerInfo + FullNodePeer.peerInfos.set(ip, { + peer: proxiedPeer, + weight: FullNodePeer.peerWeights.get(ip) || 1, + address: ip, + isConnected: true, // Mark as connected + limiter, // Assign the limiter + }); + + // Add to availablePeers + FullNodePeer.availablePeers.push(ip); } else { - FullNodePeer.peerWeights.delete(selectedPeerIP); + const peerInfo = FullNodePeer.peerInfos.get(ip)!; + if (!peerInfo.isConnected) { + // Peer is back from cooldown, re-establish connection + const sslFolder = path.resolve(os.homedir(), ".dig", "ssl"); + const certFile = path.join(sslFolder, "public_dig.crt"); + const keyFile = path.join(sslFolder, "public_dig.key"); + + if (!fs.existsSync(sslFolder)) { + fs.mkdirSync(sslFolder, { recursive: true }); + } + + const tls = new Tls(certFile, keyFile); + + let peer: Peer; + try { + peer = await Peer.new(`${ip}:${FULLNODE_PORT}`, false, tls); + } catch (error: any) { + console.error(`Failed to reconnect peer for IP ${ip}: ${error.message}`); + // Re-add to cooldown + FullNodePeer.cooldownCache.set(ip, true); + // Decrease weight or remove peer + const currentWeight = FullNodePeer.peerWeights.get(ip) || 1; + if (currentWeight > 1) { + FullNodePeer.peerWeights.set(ip, currentWeight - 1); + } else { + FullNodePeer.peerWeights.delete(ip); + } + continue; // Skip adding this peer + } + + // Wrap the peer with proxy to handle errors and retries + const proxiedPeer = FullNodePeer.createPeerProxy(peer, ip); + + // Update PeerInfo + peerInfo.peer = proxiedPeer; + peerInfo.isConnected = true; + + // Add back to availablePeers + FullNodePeer.availablePeers.push(ip); + } } - throw new Error(`Unable to connect to peer ${selectedPeerIP}`); } - // Wrap the peer with proxy to handle errors and retries - const proxiedPeer = FullNodePeer.createPeerProxy(peer, selectedPeerIP); + if (FullNodePeer.availablePeers.length === 0) { + throw new Error("No available peers to connect."); + } - // Store PeerInfo - FullNodePeer.peerInfos.set(selectedPeerIP, { - peer: proxiedPeer, - weight: FullNodePeer.peerWeights.get(selectedPeerIP) || 1, - address: selectedPeerIP, - isConnected: true, // Mark as connected - }); + // Select the next peer in round-robin + const selectedPeerIP = FullNodePeer.getNextPeerIP(); + const selectedPeerInfo = FullNodePeer.peerInfos.get(selectedPeerIP)!; // Cache the peer - FullNodePeer.cachedPeer = { peer: proxiedPeer, timestamp: now }; + FullNodePeer.cachedPeer = { peer: selectedPeerInfo.peer, timestamp: now }; console.log(`Using Fullnode Peer: ${selectedPeerIP}`); - return proxiedPeer; + return selectedPeerInfo.peer; } /** - * Creates a proxy for the peer to handle errors, implement retries, and enforce throttling. + * Creates a proxy for the peer to handle errors, implement retries, and enforce per-peer throttling. * @param {Peer} peer - The Peer instance. * @param {string} peerIP - The IP address of the peer. * @param {number} [retryCount=0] - The current retry attempt. @@ -455,21 +485,31 @@ export class FullNodePeer { if (typeof originalMethod === "function") { return (...args: any[]) => { - // Wrap the method call with Bottleneck's scheduling - return FullNodePeer.limiter.schedule(async () => { - const peerInfo = FullNodePeer.peerInfos.get(peerIP); + // Select the next peer in round-robin + let selectedPeerIP: string; + try { + selectedPeerIP = FullNodePeer.getNextPeerIP(); + } catch (error: any) { + return Promise.reject(error); + } + + const selectedPeerInfo = FullNodePeer.peerInfos.get(selectedPeerIP)!; + + // Schedule the method call via the selected peer's limiter + return selectedPeerInfo.limiter.schedule(async () => { + const peerInfo = FullNodePeer.peerInfos.get(selectedPeerIP); if (!peerInfo || !peerInfo.isConnected) { - throw new Error(`Cannot perform operation: Peer ${peerIP} is disconnected.`); + throw new Error(`Cannot perform operation: Peer ${selectedPeerIP} is disconnected.`); } try { - const result = await originalMethod.apply(target, args); + const result = await originalMethod.apply(peerInfo.peer, args); // On successful operation, increase the weight slightly - const currentWeight = FullNodePeer.peerWeights.get(peerIP) || 1; - FullNodePeer.peerWeights.set(peerIP, currentWeight + 0.1); // Increment weight + const currentWeight = FullNodePeer.peerWeights.get(selectedPeerIP) || 1; + FullNodePeer.peerWeights.set(selectedPeerIP, currentWeight + 0.1); // Increment weight return result; } catch (error: any) { - console.error(`Peer ${peerIP} encountered an error: ${error.message}`); + console.error(`Peer ${selectedPeerIP} encountered an error: ${error.message}`); // Check if the error is related to WebSocket or Operation timed out if ( @@ -477,11 +517,11 @@ export class FullNodePeer { error.message.includes("Operation timed out") ) { // Handle the disconnection and mark the peer accordingly - FullNodePeer.handlePeerDisconnection(peerIP); + FullNodePeer.handlePeerDisconnection(selectedPeerIP); // If maximum retries reached, throw the error if (retryCount >= MAX_RETRIES) { - console.error(`Max retries reached for method ${String(prop)} on peer ${peerIP}.`); + console.error(`Max retries reached for method ${String(prop)} on peer ${selectedPeerIP}.`); throw error; } @@ -542,6 +582,16 @@ export class FullNodePeer { FullNodePeer.peerInfos.set(peerIP, peerInfo); } + // Remove from availablePeers if present + const index = FullNodePeer.availablePeers.indexOf(peerIP); + if (index !== -1) { + FullNodePeer.availablePeers.splice(index, 1); + // Adjust currentPeerIndex if necessary + if (FullNodePeer.currentPeerIndex >= FullNodePeer.availablePeers.length) { + FullNodePeer.currentPeerIndex = 0; + } + } + // If the disconnected peer was the cached peer, invalidate the cache if ( FullNodePeer.cachedPeer && @@ -549,6 +599,8 @@ export class FullNodePeer { ) { FullNodePeer.cachedPeer = null; } + + console.warn(`Peer ${peerIP} has been marked as disconnected and is in cooldown.`); } /**