From 988d6a1548e7d32ec2f29496c1531a861d60135c Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 9 May 2022 11:42:50 +0200 Subject: [PATCH 1/9] Add debug print. --- massa-protocol-worker/src/node_info.rs | 5 +++++ massa-protocol-worker/src/protocol_worker.rs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index 5ab37c904d7..499d119f6aa 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -12,6 +12,7 @@ use massa_models::{ }; use massa_models::{BlockId, EndorsementId, OperationId}; use massa_protocol_exports::ProtocolSettings; +use tracing::{debug, info}; use std::collections::VecDeque; use tokio::time::Instant; @@ -133,8 +134,11 @@ impl NodeInfo { } pub fn insert_known_ops(&mut self, ops: Set, max_ops_nb: usize) { + info!("MASSA DEBUG insert_known_ops called with: {:?} operations.", ops.len()); + let mut i = 0; for operation_id in ops.into_iter() { if self.known_operations.insert(operation_id) { + i += 1; self.known_operations_queue.push_back(operation_id); while self.known_operations_queue.len() > max_ops_nb { if let Some(op_id) = self.known_operations_queue.pop_front() { @@ -143,6 +147,7 @@ impl NodeInfo { } } } + info!("MASSA DEBUG insert_known_ops inserted: {:?} operations in known_operations. The len of known_operations is now {:#?} and the queue is {:#?} (must be the same). This number should be inferior than {:#?}.", i, self.known_operations.len(), self.known_operations_queue.len(), max_ops_nb); } pub fn knows_op(&self, op: &OperationId) -> bool { diff --git a/massa-protocol-worker/src/protocol_worker.rs b/massa-protocol-worker/src/protocol_worker.rs index 929b81827c8..c0f66faa27f 100644 --- a/massa-protocol-worker/src/protocol_worker.rs +++ b/massa-protocol-worker/src/protocol_worker.rs @@ -1100,6 +1100,8 @@ impl ProtocolWorker { let mut seen_ops = vec![]; let mut new_operations = Map::with_capacity_and_hasher(length, BuildMap::default()); let mut received_ids = Map::with_capacity_and_hasher(length, BuildMap::default()); + info!("DEBUG MASSA: note_operations_from_node: operations.len(): {:#?}", length); + info!("DEBUG MASSA: note_operations_from_node: self.checked_operations.len(): {:#?}", self.checked_operations.len()); for (idx, operation) in operations.into_iter().enumerate() { let operation_id = operation.content.compute_id()?; seen_ops.push(operation_id); @@ -1127,6 +1129,7 @@ impl ProtocolWorker { // add to known ops if let Some(node_info) = self.active_nodes.get_mut(source_node_id) { + info!("DEBUG MASSA: note_operations_from_node: call insert known ops with ids of length: {:#?}", received_ids.len()); node_info.insert_known_ops( received_ids.keys().copied().collect(), self.protocol_settings.max_known_ops_size, @@ -1134,6 +1137,7 @@ impl ProtocolWorker { } if !new_operations.is_empty() { + info!("DEBUG MASSA: called received operation protocol event with a list of {:#?} operations.", new_operations.len()); // Add to pool, propagate when received outside of a header. self.send_protocol_pool_event(ProtocolPoolEvent::ReceivedOperations { operations: new_operations, From 5543ca72c5ad2cda2ddda30917860e0ceb06c120 Mon Sep 17 00:00:00 2001 From: damip Date: Mon, 9 May 2022 11:50:16 +0200 Subject: [PATCH 2/9] correct memory leak --- massa-protocol-worker/src/node_info.rs | 25 ++++++++++++++----- .../src/worker_operations_impl.rs | 16 ++---------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index 5ab37c904d7..ab285a8fa84 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -21,21 +21,21 @@ use tokio::time::Instant; pub(crate) struct NodeInfo { /// The blocks the node "knows about", /// defined as the one the node propagated headers to us for. - pub known_blocks: Map, + known_blocks: Map, /// The blocks the node asked for. - pub wanted_blocks: Map, + wanted_blocks: Map, /// Blocks we asked that node for pub asked_blocks: Map, /// Instant when the node was added pub connection_instant: Instant, /// all known operations - pub known_operations: OperationIds, + known_operations: OperationIds, /// Same as `known_operations` but sorted for a premature optimization :-) - pub known_operations_queue: VecDeque, + known_operations_queue: VecDeque, /// all known endorsements - pub known_endorsements: Set, + known_endorsements: Set, /// Same as `known_endorsements` but sorted for a premature optimization :-) - pub known_endorsements_queue: VecDeque, + known_endorsements_queue: VecDeque, } impl NodeInfo { @@ -145,6 +145,19 @@ impl NodeInfo { } } + /// Remove a list of operation IDs from the list of operation IDs known by a remote node + /// + /// Note: this is INEFFICIENT (linear traversal of the deque) and should be used sparingly + pub fn remove_known_ops(&mut self, ops: &Set) { + for op_id in ops.iter() { + if self.known_operations.remove(op_id) { + if let Some(pos) = self.known_operations.iter().position(|id| id == op_id) { + self.known_operations_queue.remove(pos); + } + } + } + } + pub fn knows_op(&self, op: &OperationId) -> bool { self.known_operations.contains(op) } diff --git a/massa-protocol-worker/src/worker_operations_impl.rs b/massa-protocol-worker/src/worker_operations_impl.rs index 3fb45196dd4..fbf32587252 100644 --- a/massa-protocol-worker/src/worker_operations_impl.rs +++ b/massa-protocol-worker/src/worker_operations_impl.rs @@ -15,7 +15,6 @@ use massa_models::{ node::NodeId, operation::{OperationIds, Operations}, prehash::BuildMap, - signed::Signable, }; use massa_network_exports::NetworkError; use massa_protocol_exports::{ProtocolError, ProtocolPoolEvent}; @@ -134,16 +133,6 @@ impl ProtocolWorker { /// `node_info.known_operations` /// - Notify the operations to he local node, to be propagated pub(crate) async fn on_operations_received(&mut self, node_id: NodeId, operations: Operations) { - let operation_ids: OperationIds = operations - .iter() - .filter_map(|signed_op| match signed_op.content.compute_id() { - Ok(op_id) => Some(op_id), - _ => None, - }) - .collect(); - if let Some(node_info) = self.active_nodes.get_mut(&node_id) { - node_info.known_operations.extend(operation_ids.iter()); - } if self .note_operations_from_node(operations, &node_id, true) .await @@ -214,9 +203,8 @@ impl ProtocolWorker { op_ids: OperationIds, ) -> Result<(), ProtocolError> { if let Some(node_info) = self.active_nodes.get_mut(&node_id) { - for op_ids in op_ids.iter() { - node_info.known_operations.remove(op_ids); - } + // remove_known_ops is inefficient when actually removing an entry, but this is almost never the case + node_info.remove_known_ops(&op_ids); } let mut operation_ids = OperationIds::default(); for op_id in op_ids.iter() { From 4a65e9937823b9d56d1e3e0e0e9c5d2a56449873 Mon Sep 17 00:00:00 2001 From: damip Date: Mon, 9 May 2022 11:51:13 +0200 Subject: [PATCH 3/9] update doc --- massa-protocol-worker/src/node_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index ab285a8fa84..c9e83657ea5 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -147,7 +147,7 @@ impl NodeInfo { /// Remove a list of operation IDs from the list of operation IDs known by a remote node /// - /// Note: this is INEFFICIENT (linear traversal of the deque) and should be used sparingly + /// Note: this is INEFFICIENT when an element is actually removed (linear traversal of the deque) and should be used sparingly pub fn remove_known_ops(&mut self, ops: &Set) { for op_id in ops.iter() { if self.known_operations.remove(op_id) { From 2ec9ed90b1ee697a51b5039463268b2e0ce5516f Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 9 May 2022 12:02:36 +0200 Subject: [PATCH 4/9] Fix errors formatting. --- massa-protocol-worker/src/node_info.rs | 7 +++++-- massa-protocol-worker/src/protocol_worker.rs | 10 ++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index eaf6450768c..14ba0a29025 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -12,9 +12,9 @@ use massa_models::{ }; use massa_models::{BlockId, EndorsementId, OperationId}; use massa_protocol_exports::ProtocolSettings; -use tracing::{debug, info}; use std::collections::VecDeque; use tokio::time::Instant; +use tracing::info; /// Information about a node we are connected to, /// essentially our view of its state. @@ -134,7 +134,10 @@ impl NodeInfo { } pub fn insert_known_ops(&mut self, ops: Set, max_ops_nb: usize) { - info!("MASSA DEBUG insert_known_ops called with: {:?} operations.", ops.len()); + info!( + "MASSA DEBUG insert_known_ops called with: {:?} operations.", + ops.len() + ); let mut i = 0; for operation_id in ops.into_iter() { if self.known_operations.insert(operation_id) { diff --git a/massa-protocol-worker/src/protocol_worker.rs b/massa-protocol-worker/src/protocol_worker.rs index c0f66faa27f..25f00366112 100644 --- a/massa-protocol-worker/src/protocol_worker.rs +++ b/massa-protocol-worker/src/protocol_worker.rs @@ -1100,8 +1100,14 @@ impl ProtocolWorker { let mut seen_ops = vec![]; let mut new_operations = Map::with_capacity_and_hasher(length, BuildMap::default()); let mut received_ids = Map::with_capacity_and_hasher(length, BuildMap::default()); - info!("DEBUG MASSA: note_operations_from_node: operations.len(): {:#?}", length); - info!("DEBUG MASSA: note_operations_from_node: self.checked_operations.len(): {:#?}", self.checked_operations.len()); + info!( + "DEBUG MASSA: note_operations_from_node: operations.len(): {:#?}", + length + ); + info!( + "DEBUG MASSA: note_operations_from_node: self.checked_operations.len(): {:#?}", + self.checked_operations.len() + ); for (idx, operation) in operations.into_iter().enumerate() { let operation_id = operation.content.compute_id()?; seen_ops.push(operation_id); From 9c74ef698f2972dbe233844527e7fd46121ec977 Mon Sep 17 00:00:00 2001 From: damip Date: Mon, 9 May 2022 13:00:39 +0200 Subject: [PATCH 5/9] correct queue removal --- massa-protocol-worker/src/node_info.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index 14ba0a29025..5edbdc940e0 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -159,7 +159,11 @@ impl NodeInfo { pub fn remove_known_ops(&mut self, ops: &Set) { for op_id in ops.iter() { if self.known_operations.remove(op_id) { - if let Some(pos) = self.known_operations.iter().position(|id| id == op_id) { + if let Some(pos) = self + .known_operations_queue + .iter() + .position(|id| id == op_id) + { self.known_operations_queue.remove(pos); } } From 21367b55a266c4cd364432920daab68ed72c0cb8 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 9 May 2022 13:10:37 +0200 Subject: [PATCH 6/9] Remove some re-ask logs. --- massa-protocol-worker/src/worker_operations_impl.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/massa-protocol-worker/src/worker_operations_impl.rs b/massa-protocol-worker/src/worker_operations_impl.rs index fbf32587252..e843174ea87 100644 --- a/massa-protocol-worker/src/worker_operations_impl.rs +++ b/massa-protocol-worker/src/worker_operations_impl.rs @@ -69,6 +69,7 @@ impl ProtocolWorker { OperationIds::with_capacity_and_hasher(op_batch.len(), BuildMap::default()); // exactitude isn't important, we want to have a now for that function call let now = Instant::now(); + let mut count_reask = 0; for op_id in op_batch { if self.checked_operations.contains(&op_id) { continue; @@ -91,11 +92,7 @@ impl ProtocolWorker { .checked_sub(self.protocol_settings.operation_batch_proc_period.into()) .ok_or(TimeError::TimeOverflowError)? { - debug!( - "re-ask operation {:?} asked for the first time {:?} millis ago.", - op_id, - wish.0.elapsed().as_millis() - ); + count_reask += 1; ask_set.insert(op_id); wish.0 = now; wish.1.push(node_id); @@ -107,6 +104,7 @@ impl ProtocolWorker { self.asked_operations.insert(op_id, (now, vec![node_id])); } } // EndOf for op_id in op_batch: + debug!("re-ask {:#?} operations.", count_reask); if self.op_batch_buffer.len() < self.protocol_settings.operation_batch_buffer_capacity && !future_set.is_empty() { From bd797df4f592b6a2613d7ece0d4c295632ed6862 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 9 May 2022 14:54:06 +0200 Subject: [PATCH 7/9] Remove debug print. --- massa-protocol-worker/src/node_info.rs | 8 -------- massa-protocol-worker/src/worker_operations_impl.rs | 4 +++- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index 5edbdc940e0..bffa033f196 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -14,7 +14,6 @@ use massa_models::{BlockId, EndorsementId, OperationId}; use massa_protocol_exports::ProtocolSettings; use std::collections::VecDeque; use tokio::time::Instant; -use tracing::info; /// Information about a node we are connected to, /// essentially our view of its state. @@ -134,14 +133,8 @@ impl NodeInfo { } pub fn insert_known_ops(&mut self, ops: Set, max_ops_nb: usize) { - info!( - "MASSA DEBUG insert_known_ops called with: {:?} operations.", - ops.len() - ); - let mut i = 0; for operation_id in ops.into_iter() { if self.known_operations.insert(operation_id) { - i += 1; self.known_operations_queue.push_back(operation_id); while self.known_operations_queue.len() > max_ops_nb { if let Some(op_id) = self.known_operations_queue.pop_front() { @@ -150,7 +143,6 @@ impl NodeInfo { } } } - info!("MASSA DEBUG insert_known_ops inserted: {:?} operations in known_operations. The len of known_operations is now {:#?} and the queue is {:#?} (must be the same). This number should be inferior than {:#?}.", i, self.known_operations.len(), self.known_operations_queue.len(), max_ops_nb); } /// Remove a list of operation IDs from the list of operation IDs known by a remote node diff --git a/massa-protocol-worker/src/worker_operations_impl.rs b/massa-protocol-worker/src/worker_operations_impl.rs index e843174ea87..2befd93db53 100644 --- a/massa-protocol-worker/src/worker_operations_impl.rs +++ b/massa-protocol-worker/src/worker_operations_impl.rs @@ -104,7 +104,9 @@ impl ProtocolWorker { self.asked_operations.insert(op_id, (now, vec![node_id])); } } // EndOf for op_id in op_batch: - debug!("re-ask {:#?} operations.", count_reask); + if count_reask > 0 { + debug!("re-ask {:#?} operations.", count_reask); + } if self.op_batch_buffer.len() < self.protocol_settings.operation_batch_buffer_capacity && !future_set.is_empty() { From 73ee63ba67485d2d54e23f94078444d6dc55f631 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 9 May 2022 14:59:41 +0200 Subject: [PATCH 8/9] Fix tests. --- massa-protocol-worker/src/node_info.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/massa-protocol-worker/src/node_info.rs b/massa-protocol-worker/src/node_info.rs index bffa033f196..d86c7c4c9c1 100644 --- a/massa-protocol-worker/src/node_info.rs +++ b/massa-protocol-worker/src/node_info.rs @@ -21,9 +21,9 @@ use tokio::time::Instant; pub(crate) struct NodeInfo { /// The blocks the node "knows about", /// defined as the one the node propagated headers to us for. - known_blocks: Map, + pub(crate) known_blocks: Map, /// The blocks the node asked for. - wanted_blocks: Map, + pub(crate) wanted_blocks: Map, /// Blocks we asked that node for pub asked_blocks: Map, /// Instant when the node was added From b6198541a7f84156cc1b46f7e6849b605ea93f64 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 9 May 2022 15:03:55 +0200 Subject: [PATCH 9/9] Remove debug print. --- massa-protocol-worker/src/protocol_worker.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/massa-protocol-worker/src/protocol_worker.rs b/massa-protocol-worker/src/protocol_worker.rs index 25f00366112..929b81827c8 100644 --- a/massa-protocol-worker/src/protocol_worker.rs +++ b/massa-protocol-worker/src/protocol_worker.rs @@ -1100,14 +1100,6 @@ impl ProtocolWorker { let mut seen_ops = vec![]; let mut new_operations = Map::with_capacity_and_hasher(length, BuildMap::default()); let mut received_ids = Map::with_capacity_and_hasher(length, BuildMap::default()); - info!( - "DEBUG MASSA: note_operations_from_node: operations.len(): {:#?}", - length - ); - info!( - "DEBUG MASSA: note_operations_from_node: self.checked_operations.len(): {:#?}", - self.checked_operations.len() - ); for (idx, operation) in operations.into_iter().enumerate() { let operation_id = operation.content.compute_id()?; seen_ops.push(operation_id); @@ -1135,7 +1127,6 @@ impl ProtocolWorker { // add to known ops if let Some(node_info) = self.active_nodes.get_mut(source_node_id) { - info!("DEBUG MASSA: note_operations_from_node: call insert known ops with ids of length: {:#?}", received_ids.len()); node_info.insert_known_ops( received_ids.keys().copied().collect(), self.protocol_settings.max_known_ops_size, @@ -1143,7 +1134,6 @@ impl ProtocolWorker { } if !new_operations.is_empty() { - info!("DEBUG MASSA: called received operation protocol event with a list of {:#?} operations.", new_operations.len()); // Add to pool, propagate when received outside of a header. self.send_protocol_pool_event(ProtocolPoolEvent::ReceivedOperations { operations: new_operations,