Skip to content

Commit

Permalink
Merge #2609
Browse files Browse the repository at this point in the history
2609: correct memory leak r=damip a=damip



Co-authored-by: AurelienFT <[email protected]>
Co-authored-by: damip <[email protected]>
  • Loading branch information
3 people authored May 9, 2022
2 parents 06700d4 + b619854 commit a725017
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
29 changes: 23 additions & 6 deletions massa-protocol-worker/src/node_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ use tokio::time::Instant;
pub(crate) struct NodeInfo {
/// The blocks the node "knows about",
/// defined as the one the node propagated headers to us for.
pub known_blocks: Map<BlockId, (bool, Instant)>,
pub(crate) known_blocks: Map<BlockId, (bool, Instant)>,
/// The blocks the node asked for.
pub wanted_blocks: Map<BlockId, Instant>,
pub(crate) wanted_blocks: Map<BlockId, Instant>,
/// Blocks we asked that node for
pub asked_blocks: Map<BlockId, Instant>,
/// Instant when the node was added
pub connection_instant: Instant,
/// all known operations
pub known_operations: OperationIds,
known_operations: OperationIds,
/// Same as `known_operations` but sorted for a premature optimization :-)
pub known_operations_queue: VecDeque<OperationId>,
known_operations_queue: VecDeque<OperationId>,
/// all known endorsements
pub known_endorsements: Set<EndorsementId>,
known_endorsements: Set<EndorsementId>,
/// Same as `known_endorsements` but sorted for a premature optimization :-)
pub known_endorsements_queue: VecDeque<EndorsementId>,
known_endorsements_queue: VecDeque<EndorsementId>,
}

impl NodeInfo {
Expand Down Expand Up @@ -145,6 +145,23 @@ impl NodeInfo {
}
}

/// Remove a list of operation IDs from the list of operation IDs known by a remote node
///
/// Note: this is INEFFICIENT when an element is actually removed (linear traversal of the deque) and should be used sparingly
pub fn remove_known_ops(&mut self, ops: &Set<OperationId>) {
for op_id in ops.iter() {
if self.known_operations.remove(op_id) {
if let Some(pos) = self
.known_operations_queue
.iter()
.position(|id| id == op_id)
{
self.known_operations_queue.remove(pos);
}
}
}
}

pub fn knows_op(&self, op: &OperationId) -> bool {
self.known_operations.contains(op)
}
Expand Down
26 changes: 7 additions & 19 deletions massa-protocol-worker/src/worker_operations_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use massa_models::{
node::NodeId,
operation::{OperationIds, Operations},
prehash::BuildMap,
signed::Signable,
};
use massa_network_exports::NetworkError;
use massa_protocol_exports::{ProtocolError, ProtocolPoolEvent};
Expand Down Expand Up @@ -70,6 +69,7 @@ impl ProtocolWorker {
OperationIds::with_capacity_and_hasher(op_batch.len(), BuildMap::default());
// exactitude isn't important, we want to have a now for that function call
let now = Instant::now();
let mut count_reask = 0;
for op_id in op_batch {
if self.checked_operations.contains(&op_id) {
continue;
Expand All @@ -92,11 +92,7 @@ impl ProtocolWorker {
.checked_sub(self.protocol_settings.operation_batch_proc_period.into())
.ok_or(TimeError::TimeOverflowError)?
{
debug!(
"re-ask operation {:?} asked for the first time {:?} millis ago.",
op_id,
wish.0.elapsed().as_millis()
);
count_reask += 1;
ask_set.insert(op_id);
wish.0 = now;
wish.1.push(node_id);
Expand All @@ -108,6 +104,9 @@ impl ProtocolWorker {
self.asked_operations.insert(op_id, (now, vec![node_id]));
}
} // EndOf for op_id in op_batch:
if count_reask > 0 {
debug!("re-ask {:#?} operations.", count_reask);
}
if self.op_batch_buffer.len() < self.protocol_settings.operation_batch_buffer_capacity
&& !future_set.is_empty()
{
Expand All @@ -134,16 +133,6 @@ impl ProtocolWorker {
/// `node_info.known_operations`
/// - Notify the operations to he local node, to be propagated
pub(crate) async fn on_operations_received(&mut self, node_id: NodeId, operations: Operations) {
let operation_ids: OperationIds = operations
.iter()
.filter_map(|signed_op| match signed_op.content.compute_id() {
Ok(op_id) => Some(op_id),
_ => None,
})
.collect();
if let Some(node_info) = self.active_nodes.get_mut(&node_id) {
node_info.known_operations.extend(operation_ids.iter());
}
if self
.note_operations_from_node(operations, &node_id, true)
.await
Expand Down Expand Up @@ -214,9 +203,8 @@ impl ProtocolWorker {
op_ids: OperationIds,
) -> Result<(), ProtocolError> {
if let Some(node_info) = self.active_nodes.get_mut(&node_id) {
for op_ids in op_ids.iter() {
node_info.known_operations.remove(op_ids);
}
// remove_known_ops is inefficient when actually removing an entry, but this is almost never the case
node_info.remove_known_ops(&op_ids);
}
let mut operation_ids = OperationIds::default();
for op_id in op_ids.iter() {
Expand Down

0 comments on commit a725017

Please sign in to comment.