Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NCL Protocol #4734

Merged
merged 17 commits into from
Dec 11, 2024
Merged

NCL Protocol #4734

merged 17 commits into from
Dec 11, 2024

Conversation

wdbaruni
Copy link
Member

@wdbaruni wdbaruni commented Dec 9, 2024

NCL Protocol Documentation

The NCL (NATS Client Library) Protocol manages reliable bidirectional communication between compute nodes and orchestrators in the Bacalhau network. It provides ordered async message delivery, connection health monitoring, and automatic recovery from failures.

Table of Contents

  1. Definitions & Key Concepts
  2. Architecture Overview
  3. Message Sequencing
  4. Connection Lifecycle
  5. Message Contracts
  6. Communication Flows
  7. Component Dependencies
  8. Configuration
  9. Glossary

Definitions & Key Concepts

Events and Messages

  • Event: An immutable record of a state change in the local system
  • Message: A communication packet sent between nodes derived from events
  • Sequence Number: A monotonically increasing identifier for ordering events and messages

Node Information

  • Node ID: Unique identifier for each compute node
  • Resources: Computational resources like CPU, Memory, GPU
  • Available Capacity: Currently free resources on a node
  • Queue Used Capacity: Resources allocated to queued jobs

Connection States

  • Disconnected: No active connection, no message processing
  • Connecting: Attempting to establish connection
  • Connected: Active message processing and health monitoring

Transitions between states occur based on:

  • Successful/failed handshakes
  • Missing heartbeats
  • Network failures
  • Explicit disconnection

Architecture Overview

The protocol consists of two main planes:

Control Plane

  • Handles connection establishment and health monitoring
  • Manages periodic heartbeats and node info updates
  • Maintains connection state and health metrics
  • Handles checkpointing for recovery

Data Plane

  • Provides reliable, ordered message delivery
  • Manages event watching and dispatching
  • Tracks message sequences for both sides
  • Handles recovery from network failures

NATS Subject Structure

bacalhau.global.compute.<nodeID>.in.msgs  - Messages to compute node
bacalhau.global.compute.<nodeID>.out.msgs - Messages from compute node
bacalhau.global.compute.<nodeID>.out.ctrl - Control messages from compute
bacalhau.global.compute.*.out.ctrl       - Global control channel

Message Sequencing

Overview

The NCL protocol integrates with a local event watcher system to decouple event processing from message delivery. Each node maintains its own ordered ledger of events that the protocol watches and selectively publishes. This decoupling provides several benefits:

  • Clean separation between business logic and message transport
  • Reliable local event ordering
  • Simple checkpointing and recovery
  • Built-in replay capabilities

Event Flow Architecture

Local Event Store          NCL Protocol              Remote Node
┌──────────────┐    ┌─────────────────────┐    ┌──────────────┐
│              │    │  1. Watch Events     │    │              │
│  Ordered     │◄───┤  2. Filter Relevant  │    │              │
│  Event       │    │  3. Create Messages  │───►│   Receive    │
│  Ledger      │    │  4. Track Sequences  │    │   Process    │
│              │    │  5. Checkpoint       │    │              │
└──────────────┘    └─────────────────────┘    └──────────────┘

Key Components

  1. Event Store

    • Maintains ordered sequence of all local events
    • Each event has unique monotonic sequence number
    • Supports seeking and replay from any position
  2. Event Watcher

    • Watches event store for new entries
    • Filters events relevant for transport
    • Supports resuming from checkpoint
  3. Message Dispatcher

    • Creates messages from events
    • Manages reliable delivery
    • Tracks publish acknowledgments

Connection Lifecycle

Initial Connection

  1. Handshake

    • Compute node initiates connection by sending HandshakeRequest
    • Includes node info, start time, and last processed sequence number
    • Orchestrator validates request and accepts/rejects connection
    • On acceptance, orchestrator creates dedicated data plane for node
  2. Data Plane Setup

    • Both sides establish message subscriptions
    • Create ordered publishers for reliable delivery
    • Initialize event watchers and dispatchers
    • Set up sequence tracking

Ongoing Communication

  1. Health Monitoring

    • Compute nodes send periodic heartbeats
    • Include current capacity and last processed sequence
    • Orchestrator tracks node health and connection state
    • Missing heartbeats trigger disconnection
  2. Node Info Updates

    • Compute nodes send updates when configuration changes
    • Includes updated capacity, features, labels
    • Orchestrator maintains current node state
  3. Message Flow

    • Data flows through separate control/data subjects
    • Messages include sequence numbers for ordering
    • Both sides track processed sequences
    • Failed deliveries trigger automatic recovery

Message Contracts

Handshake Messages

// Request sent by compute node to initiate connection
HandshakeRequest {
    NodeInfo: models.NodeInfo
    StartTime: Time
    LastOrchestratorSeqNum: uint64
}

// Response from orchestrator
HandshakeResponse {
    Accepted: boolean
    Reason: string          // Only set if not accepted
    LastComputeSeqNum: uint64
}

Heartbeat Messages

// Periodic heartbeat from compute node
HeartbeatRequest {
    NodeID: string
    AvailableCapacity: Resources
    QueueUsedCapacity: Resources
    LastOrchestratorSeqNum: uint64
}

// Acknowledgment from orchestrator
HeartbeatResponse {
    LastComputeSeqNum: uint64
}

Node Info Update Messages

// Node info update notification
UpdateNodeInfoRequest {
    NodeInfo: NodeInfo  // Same structure as in HandshakeRequest
}

UpdateNodeInfoResponse {
    Accepted: boolean
    Reason: string     // Only set if not accepted
}

Communication Flows

Initial Connection and Handshake

The following sequence shows the initial connection establishment between compute node and orchestrator:

sequenceDiagram
    participant C as Compute Node
    participant O as Orchestrator

    Note over C,O: Connection Establishment
    C->>O: HandshakeRequest(NodeInfo, StartTime, LastSeqNum)
    
    Note over O: Validate Node
    alt Valid Node
        O->>O: Create Data Plane
        O->>O: Setup Message Handlers
        O-->>C: HandshakeResponse(Accepted=true, LastSeqNum)
        
        Note over C: Setup Data Plane
        C->>C: Start Control Plane
        C->>C: Initialize Data Plane
        
        Note over C,O: Begin Regular Communication
        C->>O: Initial Heartbeat
        O-->>C: HeartbeatResponse
    else Invalid Node
        O-->>C: HandshakeResponse(Accepted=false, Reason)
        Note over C: Retry with backoff
    end
Loading

Regular Operation Flow

The following sequence shows the ongoing communication pattern between compute node and orchestrator, including periodic health checks and configuration updates:

sequenceDiagram
    participant C as Compute Node
    participant O as Orchestrator

    rect rgb(200, 230, 200)
        Note over C,O: Periodic Health Monitoring
        loop Every HeartbeatInterval
            C->>O: HeartbeatRequest(NodeID, Capacity, LastSeqNum)
            O-->>C: HeartbeatResponse()
        end
    end

    rect rgb(230, 200, 200)
        Note over C,O: Node Info Updates
        C->>C: Detect Config Change
        C->>O: UpdateNodeInfoRequest(NewNodeInfo)
        O-->>C: UpdateNodeInfoResponse(Accepted)
    end

    rect rgb(200, 200, 230)
        Note over C,O: Data Plane Messages
        O->>C: Execution Messages (with SeqNum)
        C->>O: Result Messages (with SeqNum)
        Note over C,O: Both track sequence numbers
    end
Loading

During regular operation:

  • Heartbeats occur every HeartbeatInterval (default 15s)
  • Configuration changes trigger immediate updates
  • Data plane messages flow continuously in both directions
  • Both sides maintain sequence tracking and acknowledgments

Failure Recover Flow

The protocol provides comprehensive failure recovery through several mechanisms:

sequenceDiagram
    participant C as Compute Node
    participant O as Orchestrator

    rect rgb(240, 200, 200)
        Note over C,O: Network Failure
        C->>O: HeartbeatRequest
        x--xO: Connection Lost
        
        Note over C: Detect Missing Response
        C->>C: Mark Disconnected
        C->>C: Stop Data Plane
        
        Note over O: Detect Missing Heartbeats
        O->>O: Mark Node Disconnected
        O->>O: Cleanup Node Resources
    end

    rect rgb(200, 240, 200)
        Note over C,O: Recovery
        loop Until Connected
            Note over C: Exponential Backoff
            C->>O: HandshakeRequest(LastSeqNum)
            O-->>C: HandshakeResponse(Accepted)
        end

        Note over C,O: Resume from Last Checkpoint
        Note over C: Restart Data Plane
        Note over O: Recreate Node Resources
    end
Loading

Failure Detection

  • Missing heartbeats beyond threshold
  • NATS connection failures
  • Message publish failures

Recovery Process

  1. Both sides independently detect failure
  2. Clean up existing resources
  3. Compute node initiates reconnection
  4. Resume from last checkpoint:
    • Load last checkpoint sequence
    • Resume event watching
    • Rebuild publish state
    • Resend pending messages
  5. Continue normal operation

This process ensures:

  • No events are lost
  • Messages remain ordered
  • Efficient recovery
  • At-least-once delivery

Component Dependencies

Compute Node Components:

ConnectionManager
├── ControlPlane
│   ├── NodeInfoProvider
│   │   └── Monitors node state changes
│   ├── MessageHandler
│   │   └── Processes control messages
│   └── Checkpointer
│       └── Saves progress state
└── DataPlane
    ├── LogStreamServer
    │   └── Handles job output streaming
    ├── MessageHandler
    │   └── Processes execution messages
    ├── MessageCreator
    │   └── Formats outgoing messages
    └── EventStore
        └── Tracks execution events

Orchestrator Components:

ComputeManager
├── NodeManager
│   ├── Tracks node states
│   └── Manages node lifecycle
├── MessageHandler
│   └── Processes node messages
├── MessageCreatorFactory
│   └── Creates per-node message handlers
└── DataPlane (per node)
    ├── Subscriber
    │   └── Handles incoming messages
    ├── Publisher
    │   └── Sends ordered messages
    └── Dispatcher
        └── Watches and sends events

Configuration

Connection Management

  • HeartbeatInterval: How often compute nodes send heartbeats (default: 15s)
  • HeartbeatMissFactor: Number of missed heartbeats before disconnection (default: 5)
  • NodeInfoUpdateInterval: How often node info updates are checked (default: 60s)
  • RequestTimeout: Timeout for individual requests (default: 10s)

Recovery Settings

  • ReconnectInterval: Base interval between reconnection attempts (default: 10s)
  • BaseRetryInterval: Initial retry delay after failure (default: 5s)
  • MaxRetryInterval: Maximum retry delay (default: 5m)

Data Plane Settings

  • CheckpointInterval: How often sequence progress is saved (default: 30s)

Glossary

  • Checkpoint: A saved position in the event sequence used for recovery
  • Handshake: Initial connection protocol between compute node and orchestrator
  • Heartbeat: Periodic health check message from compute node to orchestrator
  • Node Info: Current state and capabilities of a compute node
  • Sequence Number: Monotonically increasing identifier used for message ordering

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced new message types for transport operations.
    • Added a HealthTracker for monitoring connection health.
    • Implemented ControlPlane and DataPlane for managing node operations and message flow.
    • Created a ConnectionManager for handling compute node connections.
    • Added a factory structure for creating NCLMessageCreator instances with improved event filtering.
    • Enhanced the BuildVersionInfo struct with a Copy method for duplicating instances.
    • Introduced a mock implementation for testing ControlPlane and Checkpointer.
    • Added a comprehensive README for the NCL protocol.
  • Deprecations

    • Added deprecation notice for the ResourceUpdateInterval field in the Heartbeat struct.
  • Bug Fixes

    • Enhanced error handling and logging in various components.
  • Refactor

    • Updated imports and modified several method signatures to reflect new package structures.
    • Streamlined watcher and connection management processes.
    • Adjusted checkpointing behavior and logging levels for improved clarity.
  • Chores

    • Removed legacy NCL-related components and tests, consolidating functionality under the new transport layer.
    • Added comprehensive test suites for various components to ensure robust functionality.
    • Introduced utility functions for managing NATS server in a testing environment.

@wdbaruni wdbaruni self-assigned this Dec 9, 2024
Copy link
Contributor

coderabbitai bot commented Dec 9, 2024

Walkthrough

The changes in this pull request involve significant refactoring and enhancements across various packages in the codebase. Key modifications include the introduction of new structures and methods for managing connections, message creation, and health tracking within the NCL protocol. The transport layer has been restructured, with a shift from the previous NCL-based implementations to a new transport mechanism. Additionally, deprecated fields and constants have been noted, and several files related to the old transport architecture have been removed.

Changes

File Path Change Summary
pkg/compute/watchers/ncl_message_creator.go Updated import for MessageCreator interface from transport to nclprotocol. Enhanced CreateMessage method to filter events based on nodeID. Added factory structure NCLMessageCreatorFactory for creating NCLMessageCreator instances.
pkg/config/types/compute.go Added deprecation notice for ResourceUpdateInterval in Heartbeat struct, indicating future removal.
pkg/models/messages/constants.go Introduced new constants for transport operations: HandshakeRequestMessageType, HeartbeatRequestMessageType, NodeInfoUpdateRequestMessageType, HandshakeResponseType, HeartbeatResponseType, NodeInfoUpdateResponseType.
pkg/models/node_info.go Added HasNodeInfoChanged function to check if node information has changed, currently returns false.
pkg/node/compute.go Removed LogstreamServer from Compute struct. Updated connection management to use transportcompute. Simplified watcher setup by removing NCL publisher and dispatcher. Updated error handling in connection manager startup and cleanup.
pkg/node/constants.go Removed constants related to NCL dispatcher: computeNCLDispatcherWatcherID, orchestratorNCLDispatcherWatcherID.
pkg/node/ncl.go Deleted file containing functions for message serialization and subscription management.
pkg/node/requester.go Removed NCL-related components and integrated new connection manager. Updated error handling and simplified watcher setup.
pkg/orchestrator/watchers/event_logger.go Modified HandleEvent method to include sequence_number in logging for enhanced detail.
pkg/orchestrator/watchers/ncl_message_creator.go Introduced NCLMessageCreatorFactory and updated NCLMessageCreator to include nodeID.
pkg/transport/bprotocol/compute/transport.go Added logging for error handling in Start method of ConnectionManager.
pkg/transport/bprotocol/errors.go Introduced ErrUpgradeAvailable error variable to signal protocol upgrade availability.
pkg/transport/bprotocol/orchestrator/server.go Updated Register method to check if a node supports NCLv1 protocol, modifying the registration response accordingly.
pkg/transport/forwarder/forwarder.go Deleted file containing the Forwarder implementation.
pkg/transport/forwarder/forwarder_e2e_test.go Deleted end-to-end test suite for the Forwarder component.
pkg/transport/forwarder/forwarder_test.go Deleted unit test suite for the Forwarder component.
pkg/transport/mocks.go Deleted file containing mock implementations for MessageCreator.
pkg/transport/nclprotocol/compute/config.go Introduced Config struct for compute node configuration settings, including validation methods.
pkg/transport/nclprotocol/compute/controlplane.go Introduced ControlPlane struct for managing periodic control operations between compute nodes and orchestrators.
pkg/transport/nclprotocol/compute/dataplane.go Introduced DataPlane struct for managing data transfer operations between compute nodes and orchestrators.
pkg/transport/nclprotocol/compute/errors.go Added errComponent constant for error handling within compute operations.
pkg/transport/nclprotocol/compute/health_tracker.go Introduced HealthTracker struct for monitoring connection health with thread-safe mechanisms.
pkg/transport/nclprotocol/compute/manager.go Introduced ConnectionManager struct for managing compute node connections, including lifecycle and health monitoring.
pkg/transport/nclprotocol/dispatcher/config.go Updated defaultCheckpointInterval from 5 seconds to 30 seconds and modified validation logic for CheckpointInterval.
pkg/transport/nclprotocol/dispatcher/dispatcher.go Updated Dispatcher struct to use nclprotocol.MessageCreator and streamlined background goroutine management.
pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go Updated import path for dispatcher package to reflect new organizational structure.
pkg/transport/nclprotocol/dispatcher/dispatcher_test.go Updated import path for MockMessageCreator to reflect new package structure.
pkg/transport/nclprotocol/dispatcher/handler.go Updated import for MessageCreator in messageHandler struct to nclprotocol.
pkg/transport/nclprotocol/dispatcher/handler_test.go Updated HandlerTestSuite to use MockMessageCreator from nclprotocol.
pkg/transport/nclprotocol/dispatcher/state.go Changed logging level for lastCheckpoint from Debug to Trace.
pkg/transport/nclprotocol/mocks.go Introduced mock implementations for interfaces in the nclprotocol package.
pkg/transport/nclprotocol/orchestrator/config.go Introduced Config struct for orchestrator transport layer configuration settings.
pkg/transport/nclprotocol/orchestrator/dataplane.go Introduced DataPlane struct for managing message flow between orchestrators and compute nodes.
pkg/transport/nclprotocol/orchestrator/manager.go Introduced ComputeManager struct for managing the lifecycle and state of compute nodes.
pkg/transport/nclprotocol/registry.go Introduced functions for creating a message registry within the nclprotocol package.
pkg/transport/nclprotocol/subjects.go Introduced functions for generating NATS messaging subjects for compute nodes.
pkg/transport/nclprotocol/tracker.go Introduced SequenceTracker type for managing and tracking message sequence numbers.
pkg/transport/nclprotocol/types.go Introduced types and interfaces for connection management and message handling, including ConnectionState, Checkpointer, MessageCreator, and MessageCreatorFactory.
pkg/transport/types.go Deleted file containing definitions related to the transport layer, including the MessageCreator interface.

Possibly related issues

  • bacalhau-project/bacalhau#4735: The addition of the HasNodeInfoChanged function in pkg/models/node_info.go aligns with the objective to implement this function to compare fields of the NodeInfo struct.

Possibly related PRs

  • fix swagger generation #4699: This PR addresses updates to the Swagger documentation, including changes to the types.Bacalhau definition, which now includes a new property DisableAnalytics. While it does not directly relate to the changes in the NCLMessageCreator, it reflects ongoing modifications in the API documentation that may indirectly connect to the overall restructuring of the transport layer, including the nclprotocol changes.

Suggested reviewers

  • markkuhn

Poem

In the meadow where we play,
Changes hop and leap today.
New paths for messages to flow,
With each update, watch us grow!
Hopping high, we cheer with glee,
For a brighter code, just wait and see! 🐇✨


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

🧹 Outside diff range and nitpick comments (23)
pkg/transport/bprotocol/errors.go (1)

7-8: LGTM with minor suggestion for error message clarity

The error definition is well-structured and follows Go conventions. Consider making the error message slightly more actionable:

-var ErrUpgradeAvailable = fmt.Errorf("node supports NCLv1 protocol - legacy protocol disabled")
+var ErrUpgradeAvailable = fmt.Errorf("node supports NCLv1 protocol - please upgrade to use the new protocol (legacy protocol disabled)")
pkg/transport/bprotocol/compute/transport.go (1)

Line range hint 1-4: Consider enhancing deprecation notice

The package documentation clearly indicates this is legacy code, but could be more specific about migration path.

 // Package compute provides transport layer implementation for compute nodes using
 // the legacy bprotocol over NATS. This package will be deprecated in future releases
-// in favor of a new transport implementation.
+// in favor of the NCLv1 protocol transport implementation.
 package compute
pkg/transport/nclprotocol/compute/config.go (1)

89-89: Avoid magic numbers: Define constants for backoff intervals

The use of literal values 10*time.Second and 2*time.Minute for ReconnectBackoff can be improved by defining named constants. This enhances readability and makes future adjustments easier.

Apply this diff to define and use constants:

84     return Config{
85         HeartbeatMissFactor: 5, // allow up to 5 missed heartbeats before marking a node as disconnected
86         RequestTimeout:      10 * time.Second,
87         ReconnectInterval:   10 * time.Second,
88         CheckpointInterval:  30 * time.Second,
-89         ReconnectBackoff:    backoff.NewExponential(10*time.Second, 2*time.Minute),
+89         ReconnectBackoff:    backoff.NewExponential(defaultReconnectInitialInterval, defaultReconnectMaxInterval),
90         MessageSerializer:   envelope.NewSerializer(),
91         MessageRegistry:     nclprotocol.MustCreateMessageRegistry(),
92         DispatcherConfig:    dispatcher.DefaultConfig(),
93         Clock:               clock.New(),
94     }
95 }
96 
+const (
+    defaultReconnectInitialInterval = 10 * time.Second
+    defaultReconnectMaxInterval     = 2 * time.Minute
+)
🧰 Tools
🪛 GitHub Check: lint / go-lint (ubuntu-latest)

[failure] 89-89:
Magic number: 10, in detected (mnd)

pkg/transport/nclprotocol/orchestrator/config.go (1)

72-74: Avoid magic numbers: Define constants for default timeouts

The use of literal values for timeouts can be replaced with named constants to improve code clarity and maintainability.

Apply this diff to define and use constants:

70     return Config{
71         // Default timeouts and intervals
-72         HeartbeatTimeout:      2 * time.Minute,  // Time before node considered disconnected
-73         NodeCleanupInterval:   30 * time.Second, // Check for disconnected nodes every 30s
-74         RequestHandlerTimeout: 2 * time.Second,  // Individual request timeout
+72         HeartbeatTimeout:      defaultHeartbeatTimeout,      // Time before node considered disconnected
+73         NodeCleanupInterval:   defaultNodeCleanupInterval,   // Check for disconnected nodes every 30s
+74         RequestHandlerTimeout: defaultRequestHandlerTimeout,  // Individual request timeout
75 
+// Default configuration constants
+const (
+    defaultHeartbeatTimeout      = 2 * time.Minute
+    defaultNodeCleanupInterval   = 30 * time.Second
+    defaultRequestHandlerTimeout = 2 * time.Second
+)
pkg/orchestrator/watchers/ncl_message_creator.go (1)

17-41: Add documentation comments for new types and methods

The newly introduced NCLMessageCreatorFactory, NCLMessageCreatorFactoryParams, and associated methods lack documentation comments. Adding comments will improve code readability and help other developers understand the purpose and usage of these components.

pkg/transport/nclprotocol/compute/dataplane.go (1)

161-181: Simplify error aggregation in cleanup method

In the cleanup method, errors are aggregated using errors.Join, which could be replaced with more straightforward error handling for better readability.

Consider using an error slice to collect errors and then combining them at the end:

var errs []error

// ... accumulate errors into errs slice ...

if len(errs) > 0 {
    return fmt.Errorf("failed to cleanup data plane: %v", errs)
}
pkg/transport/nclprotocol/compute/controlplane.go (3)

104-108: Simplify shutdown by combining select cases

The select statement in the run method handles both ctx.Done() and <-cp.stopCh separately, but both cases result in returning from the function. Consider combining these cases to simplify the shutdown logic.

Modify the select statement:

select {
case <-ctx.Done(), <-cp.stopCh:
    return
// other cases...
}

Line range hint 260-262: Handle potential nil error when creating heartbeat response

In the heartbeat method, the response from requester.Request is used without checking for a possible nil value or error in deserialization. Ensure that the payload is valid before proceeding.


217-239: Ensure proper goroutine shutdown in Stop method

In the Stop method, a goroutine is launched at line 229 to wait for cp.wg.Done(), but there may be a risk of a goroutine leak if the wait group doesn't complete. Consider adding a timeout or ensuring that all started goroutines will eventually finish.

pkg/transport/nclprotocol/orchestrator/dataplane.go (1)

226-256: Simplify error aggregation in cleanup method

Similar to previous files, the cleanup method aggregates errors using errors.Join. Consider using an error slice for clarity.

pkg/transport/nclprotocol/subjects.go (1)

1-29: Consider extracting subject components as constants

The subject pattern contains several static components that could be defined as constants to improve maintainability and reduce the risk of typos.

+const (
+	subjectRoot    = "bacalhau.global"
+	subjectCompute = "compute"
+	subjectInDir   = "in"
+	subjectOutDir  = "out"
+	subjectMsgs    = "msgs"
+	subjectCtrl    = "ctrl"
+)

 func NatsSubjectOrchestratorInMsgs(computeNodeID string) string {
-	return fmt.Sprintf("bacalhau.global.compute.%s.out.msgs", computeNodeID)
+	return fmt.Sprintf("%s.%s.%s.%s.%s", subjectRoot, subjectCompute, computeNodeID, subjectOutDir, subjectMsgs)
 }
pkg/transport/nclprotocol/registry.go (1)

14-20: Document the purpose of each message type registration

Add comments explaining the purpose and usage of each message type to improve maintainability.

 err := errors.Join(
+		// Bid-related messages for job allocation
 		reg.Register(messages.AskForBidMessageType, messages.AskForBidRequest{}),
 		reg.Register(messages.BidAcceptedMessageType, messages.BidAcceptedRequest{}),
 		reg.Register(messages.BidRejectedMessageType, messages.BidRejectedRequest{}),
+		// Execution control messages
 		reg.Register(messages.CancelExecutionMessageType, messages.CancelExecutionRequest{}),
+		// Result reporting messages
 		reg.Register(messages.BidResultMessageType, messages.BidResult{}),
 		reg.Register(messages.RunResultMessageType, messages.RunResult{}),
 		reg.Register(messages.ComputeErrorMessageType, messages.ComputeError{}),
pkg/transport/nclprotocol/tracker.go (2)

31-34: Consider overflow handling for sequence numbers

The UpdateLastSeqNum method should handle potential overflow scenarios for uint64 values.

 func (s *SequenceTracker) UpdateLastSeqNum(seqNum uint64) {
+	// Handle potential overflow
+	current := s.GetLastSeqNum()
+	if seqNum < current {
+		log.Warn().Uint64("current", current).Uint64("new", seqNum).Msg("Sequence number overflow detected")
+	}
 	s.lastSeqNum.Store(seqNum)
 }

43-49: Consider utilizing context in OnProcessed

The ctx parameter is not used in the OnProcessed method. Consider using it for cancellation or logging correlation.

 func (s *SequenceTracker) OnProcessed(ctx context.Context, message *envelope.Message) {
 	if message.Metadata.Has(KeySeqNum) {
 		s.UpdateLastSeqNum(message.Metadata.GetUint64(KeySeqNum))
 	} else {
-		log.Trace().Msgf("No sequence number found in message metadata %v", message.Metadata)
+		log.Ctx(ctx).Trace().Msgf("No sequence number found in message metadata %v", message.Metadata)
 	}
 }
pkg/transport/nclprotocol/types.go (3)

37-40: Consider adding method documentation

The Checkpointer interface methods lack documentation explaining their purpose and expected behavior.

Add documentation for better maintainability:

 type Checkpointer interface {
+    // Checkpoint stores the sequence number for the given name.
+    // Returns an error if the checkpoint operation fails.
     Checkpoint(ctx context.Context, name string, sequenceNumber uint64) error
+    // GetCheckpoint retrieves the last stored sequence number for the given name.
+    // Returns the sequence number and any error encountered during retrieval.
     GetCheckpoint(ctx context.Context, name string) (uint64, error)
 }

45-53: Consider adding validation for time fields

The ConnectionHealth struct tracks various timestamps but lacks validation to ensure logical time ordering (e.g., StartTime should be before LastSuccessfulUpdate).

Consider adding a validation method:

func (h *ConnectionHealth) Validate() error {
    if h.StartTime.After(h.LastSuccessfulUpdate) {
        return fmt.Errorf("start time cannot be after last successful update")
    }
    if h.ConnectedSince.Before(h.StartTime) {
        return fmt.Errorf("connected since cannot be before start time")
    }
    return nil
}

74-76: Consider more robust message ID generation

The current message ID format only uses sequence number, which might not guarantee uniqueness across different nodes or restarts.

Consider incorporating node ID and timestamp:

-func GenerateMsgID(event watcher.Event) string {
-    return fmt.Sprintf("seq-%d", event.SeqNum)
+func GenerateMsgID(event watcher.Event, nodeID string) string {
+    return fmt.Sprintf("%s-seq-%d-%d", nodeID, event.SeqNum, time.Now().UnixNano())
}
pkg/transport/nclprotocol/dispatcher/handler.go (2)

Line range hint 31-44: Consider adding retry mechanism for message creation

The HandleEvent method fails immediately if message creation fails, which might be too strict for transient errors.

Consider adding retry logic:

 func (h *messageHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
+    var message *envelope.Message
+    var err error
+    for retries := 0; retries < 3; retries++ {
+        message, err = h.creator.CreateMessage(event)
+        if err == nil || !isTransientError(err) {
+            break
+        }
+        time.Sleep(time.Second * time.Duration(retries+1))
+    }
-    message, err := h.creator.CreateMessage(event)
     if err != nil {
         return newPublishError(fmt.Errorf("create message: %w", err))
     }

Line range hint 46-71: Add timeout for async publish operations

The enrichAndPublish method publishes asynchronously but doesn't set a timeout for the operation.

Add context timeout:

 func (h *messageHandler) enrichAndPublish(ctx context.Context, message *envelope.Message, event watcher.Event) error {
+    // Set timeout for publish operation
+    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+    defer cancel()
+
     // Add metadata
     message.WithMetadataValue(ncl.KeyMessageID, generateMsgID(event))
pkg/config/types/compute.go (1)

35-37: Enhance deprecation notice with migration guidance

While the deprecation notice is good, it should provide guidance on the migration path.

Enhance the deprecation comment:

-    // Deprecated: only used by legacy transport, will be removed in the future.
+    // Deprecated: only used by legacy transport, will be removed in v1.0.0.
+    // Migration: Use InfoUpdateInterval for all update types in the new transport layer.
+    // The new transport implements smart batching and throttling internally.
     ResourceUpdateInterval Duration `yaml:"ResourceUpdateInterval,omitempty" json:"ResourceUpdateInterval,omitempty"`
pkg/orchestrator/watchers/event_logger.go (1)

44-46: LGTM: Enhanced logging with sequence numbers

The addition of sequence numbers to log events improves traceability and debugging capabilities, which is particularly valuable for the new transport layer implementation.

Consider adding structured logging for transport-specific fields (e.g., protocol version, transport type) to further enhance observability of the new nclprotocol transport layer.

pkg/transport/nclprotocol/dispatcher/config.go (1)

33-33: Document rationale for checkpoint configuration changes

The comment indicates that negative values disable checkpointing, but it would be helpful to document:

  1. Why the default interval was increased to 30 seconds
  2. Use cases for disabling checkpointing
pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)

Line range hint 42-93: Add test cases for checkpoint configuration

While the existing test cases cover basic dispatcher functionality, consider adding tests for:

  1. Behavior with negative checkpoint intervals (disabled checkpointing)
  2. Behavior with the new 30s default interval

Example test case:

func (suite *DispatcherTestSuite) TestDisabledCheckpointing() {
    // Configure negative checkpoint interval
    suite.config.CheckpointInterval = -1
    
    suite.watcher.EXPECT().SetHandler(gomock.Any()).Return(nil)
    suite.watcher.EXPECT().Start(gomock.Any()).Return(nil)
    
    d, err := New(suite.publisher, suite.watcher, suite.creator, suite.config)
    suite.Require().NoError(err)
    
    err = d.Start(suite.ctx)
    suite.NoError(err)
    
    // Verify no checkpointing occurs
    // Add appropriate assertions
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between f49e65a and fba2fb5.

📒 Files selected for processing (39)
  • pkg/compute/watchers/ncl_message_creator.go (2 hunks)
  • pkg/config/types/compute.go (1 hunks)
  • pkg/models/messages/constants.go (1 hunks)
  • pkg/models/node_info.go (1 hunks)
  • pkg/node/compute.go (4 hunks)
  • pkg/node/constants.go (0 hunks)
  • pkg/node/ncl.go (0 hunks)
  • pkg/node/requester.go (6 hunks)
  • pkg/orchestrator/watchers/event_logger.go (1 hunks)
  • pkg/orchestrator/watchers/ncl_message_creator.go (3 hunks)
  • pkg/transport/bprotocol/compute/transport.go (2 hunks)
  • pkg/transport/bprotocol/errors.go (1 hunks)
  • pkg/transport/bprotocol/orchestrator/server.go (1 hunks)
  • pkg/transport/forwarder/forwarder.go (0 hunks)
  • pkg/transport/forwarder/forwarder_e2e_test.go (0 hunks)
  • pkg/transport/forwarder/forwarder_test.go (0 hunks)
  • pkg/transport/mocks.go (0 hunks)
  • pkg/transport/nclprotocol/compute/config.go (1 hunks)
  • pkg/transport/nclprotocol/compute/controlplane.go (1 hunks)
  • pkg/transport/nclprotocol/compute/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/compute/errors.go (1 hunks)
  • pkg/transport/nclprotocol/compute/health_tracker.go (1 hunks)
  • pkg/transport/nclprotocol/compute/manager.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/config.go (2 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher.go (3 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (3 hunks)
  • pkg/transport/nclprotocol/dispatcher/handler.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/handler_test.go (2 hunks)
  • pkg/transport/nclprotocol/dispatcher/state.go (1 hunks)
  • pkg/transport/nclprotocol/mocks.go (1 hunks)
  • pkg/transport/nclprotocol/orchestrator/config.go (1 hunks)
  • pkg/transport/nclprotocol/orchestrator/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/orchestrator/manager.go (1 hunks)
  • pkg/transport/nclprotocol/registry.go (1 hunks)
  • pkg/transport/nclprotocol/subjects.go (1 hunks)
  • pkg/transport/nclprotocol/tracker.go (1 hunks)
  • pkg/transport/nclprotocol/types.go (1 hunks)
  • pkg/transport/types.go (0 hunks)
💤 Files with no reviewable changes (7)
  • pkg/node/constants.go
  • pkg/transport/types.go
  • pkg/transport/forwarder/forwarder_e2e_test.go
  • pkg/transport/mocks.go
  • pkg/transport/forwarder/forwarder_test.go
  • pkg/transport/forwarder/forwarder.go
  • pkg/node/ncl.go
✅ Files skipped from review due to trivial changes (4)
  • pkg/transport/nclprotocol/compute/errors.go
  • pkg/transport/nclprotocol/dispatcher/state.go
  • pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go
  • pkg/transport/nclprotocol/mocks.go
🧰 Additional context used
🪛 GitHub Check: lint / go-lint (ubuntu-latest)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
errorsas: second argument to errors.As should not be *error (govet)

pkg/transport/nclprotocol/compute/config.go

[failure] 89-89:
Magic number: 10, in detected (mnd)

🔇 Additional comments (25)
pkg/transport/bprotocol/orchestrator/server.go (1)

55-63: Protocol upgrade check implementation looks good

The early protocol check effectively enforces the transition to NCLv1. The implementation:

  • Correctly checks for NCLv1 protocol support
  • Returns appropriate response with clear reason
  • Follows early return pattern for clean control flow
pkg/transport/bprotocol/compute/transport.go (1)

148-152: Error handling for protocol upgrade looks good

The implementation gracefully handles the upgrade scenario by:

  • Detecting the upgrade availability
  • Logging informative message
  • Cleaning up resources
  • Continuing without error
🧰 Tools
🪛 GitHub Check: lint / go-lint (ubuntu-latest)

[failure] 148-148:
errorsas: second argument to errors.As should not be *error (govet)

pkg/orchestrator/watchers/ncl_message_creator.go (1)

79-84: Confirm event filtering logic aligns with intended behavior

The added event filtering ensures that messages are only created for events related to the specific nodeID. Please verify that this logic correctly filters events and does not inadvertently skip messages that should be processed.

pkg/transport/nclprotocol/compute/dataplane.go (1)

134-135: Properly handle errors when starting the dispatcher

If dp.dispatcher.Start(ctx) returns an error at line 134, the deferred cleanup may not stop the dispatcher correctly since dp.dispatcher is already non-nil. Ensure that the dispatcher is properly stopped in case of an error during startup.

Confirm that the deferred dp.cleanup(ctx) will correctly stop the dispatcher even if dp.dispatcher.Start(ctx) fails.

pkg/transport/nclprotocol/compute/controlplane.go (1)

96-100: Timers may not be stopped if function exits early

In the run method, the deferred function at line 96 stops the timers. However, if the function exits via the return statements at lines 105 or 107, the deferred function will still be called, so the timers will be stopped. No action is needed.

pkg/transport/nclprotocol/orchestrator/dataplane.go (1)

114-116: ⚠️ Potential issue

Return error if subscriber setup fails

In setupSubscriber, the error returned by dp.subscriber.Subscribe(ctx, subject) is not checked. Ensure that you handle the error properly.

Modify setupSubscriber to return the error from Subscribe:

152     return dp.subscriber.Subscribe(ctx, subject)
+153     if err != nil {
+154         return fmt.Errorf("subscribe to subject %s: %w", subject, err)
+155     }
+156     return nil

Likely invalid or redundant comment.

pkg/node/compute.go (2)

31-32: Imports updated to reflect new transport layer

The imports have been updated to use the new nclprotocol transport layer, aligning with the refactoring efforts.


200-215: Verify the necessity of LogStreamServer in connection manager configuration

Given that the LogstreamServer field has been removed from the Compute struct, please verify whether the LogStreamServer parameter is still required in the transportcompute.NewConnectionManager configuration.

Run the following script to check if LogStreamServer is a required field in transportcompute.Config:

pkg/node/requester.go (2)

266-277: Initialize new connection manager for orchestrator

The new connectionManager is initialized with the new transport layer configuration, aligning with the shift away from the NCL-based protocols.


299-300: Ensure proper shutdown of connection manager during cleanup

The connectionManager is correctly stopped in the cleanup function, handling any potential errors during shutdown.

pkg/models/messages/constants.go (1)

13-19: Addition of transport-related message types

The new message types for handshake, heartbeat, and node info updates have been added appropriately, enhancing the messaging capabilities.

pkg/transport/nclprotocol/registry.go (1)

11-33: Verify complete message type coverage

Ensure all required message types are registered to prevent runtime errors.

✅ Verification successful

Let me try a different approach to find the message types and structs.


Let me search specifically for the message types and their corresponding structs in the messages package.


Based on the results, I can now generate the final response as I have found all the message types and their corresponding struct definitions.

All required message types are properly registered

Comparing the message types registered in CreateMessageRegistry() with the message types defined in pkg/models/messages/constants.go and their corresponding struct implementations in the messages package, all required message types are properly registered with their correct struct implementations. The registration includes:

  • Bidding messages (AskForBid, BidAccepted, BidRejected, BidResult)
  • Execution messages (CancelExecution, RunResult, ComputeError)
  • Control plane messages (HandshakeRequest, HeartbeatRequest, NodeInfoUpdateRequest)
  • Control plane responses (HandshakeResponse, HeartbeatResponse, NodeInfoUpdateResponse)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find all defined message types
echo "Searching for message type definitions..."
rg "Type\s+=\s+\".*\"" -g "messages/*.go"

# Find all message struct definitions
echo "Searching for message structs..."
ast-grep --pattern 'type $name struct {
  $$$
}' -l go

Length of output: 415


Script:

#!/bin/bash
# First, let's find the messages directory
fd "messages$"

# Then, let's look for all message type constants and structs in the messages directory
echo "Message type constants:"
rg "const.*MessageType|const.*Type\s+=" --type go

echo -e "\nMessage structs:"
rg "type.*struct" --type go

Length of output: 65727


Script:

#!/bin/bash
# Find message type constants in messages package
echo "Message type constants:"
rg "Type\s+=\s+\".*\"" pkg/models/messages/

# Find message structs in messages package
echo -e "\nMessage structs:"
rg "type.*struct" pkg/models/messages/

Length of output: 3864

pkg/transport/nclprotocol/tracker.go (1)

44-45: ⚠️ Potential issue

Define or import KeySeqNum constant

The KeySeqNum constant is used but not defined in this file. Either define it here or document the import source.

pkg/transport/nclprotocol/types.go (1)

14-35: LGTM: Well-defined connection state enumeration

The ConnectionState enumeration and String() method are well-implemented with clear states and proper string representations.

pkg/transport/nclprotocol/dispatcher/handler.go (1)

15-18: LGTM: Clean struct definition with proper field types

The messageHandler struct is well-defined with clear dependencies.

pkg/config/types/compute.go (1)

35-37: Verify usage of deprecated field

Let's check if there are any remaining uses of this field that need to be updated.

✅ Verification successful

Let me gather more context about the legacy transport usage to better understand if this field is still actively used.


The ResourceUpdateInterval field is still actively used in the legacy transport system

The field is currently used in the legacy transport system's ManagementClient implementation (pkg/transport/bprotocol/compute/management_client.go) for sending resource updates to the requester node. Since this is part of the legacy transport that will be removed in the future, the deprecation notice is correct, but the field cannot be removed until the legacy transport system is fully deprecated.

Key usage locations:

  • pkg/transport/bprotocol/compute/management_client.go: Used to create a ticker for resource updates
  • pkg/config/defaults.go: Default value is set to 30 seconds
  • Several test files still rely on this configuration
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for ResourceUpdateInterval usage
echo "Searching for ResourceUpdateInterval usage..."
rg "ResourceUpdateInterval" --type go

# Search for specific transport-related imports
echo "Checking for legacy transport imports..."
rg "\"github.com/bacalhau-project/bacalhau/pkg/transport\"" --type go

Length of output: 1700


Script:

#!/bin/bash
# Check for bprotocol usage and imports
echo "Checking for bprotocol imports and usage..."
rg "\"github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol\"" --type go

# Check the management client implementation
echo "Checking management client implementation..."
cat pkg/transport/bprotocol/compute/management_client.go

Length of output: 6261

pkg/compute/watchers/ncl_message_creator.go (2)

11-11: LGTM: Import path updated to reflect new transport layer

The import path change aligns with the PR objective of introducing the nclprotocol transport layer.


70-70: LGTM: Interface implementation check updated

The compile-time check correctly verifies that NCLMessageCreator implements the new nclprotocol.MessageCreator interface.

pkg/transport/nclprotocol/dispatcher/config.go (1)

12-12: Verify impact of increased checkpoint interval

The checkpoint interval has been increased from 5s to 30s. While this reduces IO operations, it also increases the potential data loss window in case of failures.

✅ Verification successful

Let me gather more context about the checkpoint usage and any potential performance considerations.


Let me gather more specific information about the checkpoint usage in the codebase.


Based on the gathered information, I can now provide a final response:

Checkpoint interval change is safe and intentional

The increase in checkpoint interval from 5s to 30s is a deliberate design choice with acceptable trade-offs:

  • The checkpoint mechanism is used to track message processing progress, allowing resumption after node restarts
  • The code shows proper error handling and timeout mechanisms to handle checkpoint failures
  • The change aligns with other timing configurations in the system (e.g., defaultStallCheckInterval is also 30s)
  • The increased interval reduces IO operations while maintaining a reasonable recovery window
  • Checkpoint failures are logged and don't block message processing
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any existing performance or reliability issues related to checkpointing
rg -i "checkpoint.*(\btimeout\b|\bfail|\berror|\bslow\b)" 

Length of output: 3852


Script:

#!/bin/bash
# Look for checkpoint interval usage and related comments
rg -A 3 -B 3 "checkpointInterval|checkpoint.*interval" 

# Look for any performance or reliability related comments
rg -A 2 "TODO|FIXME|NOTE|WARNING.*checkpoint"

Length of output: 66418


Script:

#!/bin/bash
# Look for checkpoint implementation and related code
rg -A 5 "func.*checkpoint.*Progress|checkpointProgress" 

# Look for any checkpoint interval usage in the code
rg -A 5 "defaultCheckpointInterval|CheckpointInterval.*="

Length of output: 4794

pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)

16-16: LGTM: Import path changes are consistent

The changes correctly update the import path and type references for MockMessageCreator to align with the new nclprotocol package structure.

Also applies to: 25-25, 35-35

pkg/transport/nclprotocol/dispatcher/dispatcher.go (5)

17-17: LGTM: Import change aligns with the new transport layer architecture.

The change from transport to nclprotocol package is consistent with the PR's objective of introducing the nclprotocol transport layer.


95-97: LGTM: Core publish results processing goroutine.

The essential goroutine for processing publish results is properly initialized.


101-104: Good optimization: Conditional checkpoint goroutine.

Making the checkpoint goroutine conditional based on CheckpointInterval > 0 is a good optimization that prevents unnecessary resource usage when checkpointing is disabled.


45-45: Verify MessageCreator interface implementations.

The change from transport.MessageCreator to nclprotocol.MessageCreator looks good, but we should verify all implementations have been updated.

✅ Verification successful

All MessageCreator implementations have been properly updated

The verification shows that all implementations are using the new nclprotocol.MessageCreator interface:

  • The interface is defined in pkg/transport/nclprotocol/types.go
  • All implementations are correctly using the new package:
    • NCLMessageCreator in pkg/compute/watchers
    • NCLMessageCreator in pkg/orchestrator/watchers
    • Mock implementations in pkg/transport/nclprotocol/mocks.go
    • Test implementations in dispatcher tests

No references to the old transport.MessageCreator were found, confirming the change has been properly propagated throughout the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining references to the old transport.MessageCreator
rg "transport\.MessageCreator" 

# Search for implementations of the new interface
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($v $_) CreateMessage($$$) {
  $$$
}'

Length of output: 123


Script:

#!/bin/bash
# Search for MessageCreator interface definition
rg "type MessageCreator interface" -A 5

# Search for CreateMessage method implementations
rg "func.*CreateMessage"

# Search for any struct implementing CreateMessage
ast-grep --pattern 'type $name struct {
  $$$
}'

# Find imports of nclprotocol package
rg "nclprotocol"

Length of output: 13056


98-100: Verify synchronization with stalled message checking.

While the stalled message checking is important, we should ensure it doesn't interfere with the publish results processing.

✅ Verification successful

Let me gather more information about the synchronization mechanisms used in the dispatcher.


Let me check one more aspect about the message handling and mutex protection.


The stalled message checking is properly synchronized with publish results processing

The code analysis shows proper synchronization mechanisms are in place:

  • All state access is protected by mutexes (sync.RWMutex)
  • The pendingMessageStore uses mutex protection for all operations (Add, RemoveUpTo, Clear)
  • The dispatcherState methods are thread-safe with proper locking
  • The routinesWg ensures clean shutdown coordination
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for potential race conditions in message handling
rg -A 5 "msg\." pkg/transport/nclprotocol/dispatcher/

Length of output: 3493


Script:

#!/bin/bash
# Search for mutex and synchronization primitives
rg -A 5 "sync\." pkg/transport/nclprotocol/dispatcher/

# Look for state access patterns
ast-grep --pattern 'd.state.$_'

Length of output: 2830


Script:

#!/bin/bash
# Look for the implementation of state methods
ast-grep --pattern 'func (s *dispatcherState) $_($$$) {
  $$$
}'

# Check pendingMessageStore implementation
ast-grep --pattern 'func (s *pendingMessageStore) $_($$$) {
  $$$
}'

Length of output: 3483

pkg/transport/bprotocol/compute/transport.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/compute/health_tracker.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/compute/dataplane.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/compute/manager.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/subjects.go Show resolved Hide resolved
pkg/transport/nclprotocol/subjects.go Show resolved Hide resolved
pkg/models/node_info.go Outdated Show resolved Hide resolved
@wdbaruni wdbaruni changed the title introduce nclprotocol transport layer nclprotocol transport layer Dec 10, 2024
wdbaruni and others added 3 commits December 10, 2024 08:42
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
pkg/transport/bprotocol/compute/transport.go (2)

Line range hint 1-4: Enhance deprecation notice with timeline and tracking information.

Since this package is marked for deprecation, it would be helpful to:

  1. Add a timeline or version when this will be deprecated
  2. Reference the tracking issue/PR for the new transport implementation
  3. Add a link to documentation about migrating to the new transport layer

Would you like me to help create a GitHub issue to track the deprecation timeline and migration documentation?


Line range hint 1-215: Provide context about the transport layer migration strategy.

Since this package is being deprecated in favor of a new transport implementation, it would be helpful to understand:

  1. The architectural differences between the old and new transport layers
  2. The migration strategy and timeline
  3. Any potential breaking changes that users should be aware of

Consider documenting:

  • Key differences between the old and new transport layers
  • Migration guide for users
  • Breaking changes and compatibility notes
  • Performance implications, if any

Would you like me to help create documentation templates for these aspects?

🧰 Tools
🪛 GitHub Check: build / Build Binary (windows, amd64)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (darwin, arm64)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (darwin, amd64)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, armv6)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, armv7)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, arm64)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, amd64)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: lint / go-lint (ubuntu-latest)

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type (typecheck)


[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type) (typecheck)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between fba2fb5 and ff764c2.

📒 Files selected for processing (3)
  • pkg/transport/bprotocol/compute/transport.go (2 hunks)
  • pkg/transport/nclprotocol/compute/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/compute/health_tracker.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/transport/nclprotocol/compute/health_tracker.go
  • pkg/transport/nclprotocol/compute/dataplane.go
🧰 Additional context used
🪛 GitHub Check: build / Build Binary (windows, amd64)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (darwin, arm64)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (darwin, amd64)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, armv6)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, armv7)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, arm64)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: build / Build Binary (linux, amd64)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type

🪛 GitHub Check: lint / go-lint (ubuntu-latest)
pkg/transport/bprotocol/compute/transport.go

[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type (typecheck)


[failure] 148-148:
bprotocol.ErrUpgradeAvailable (variable of type error) is not a type) (typecheck)

pkg/transport/bprotocol/compute/transport.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (3)
pkg/transport/nclprotocol/compute/manager.go (3)

19-19: Document the rationale for buffer size

Consider adding a comment explaining why 32 was chosen as the buffer size for state changes. This helps future maintainers understand if this value needs adjustment based on load characteristics.


288-289: Consider making handshake timeout configurable

The handshake timeout is hardcoded to use config.RequestTimeout. In high-latency environments or under heavy load, this might need to be adjusted independently of other request timeouts.

Consider adding a separate HandshakeTimeout configuration option.


21-28: Consider adding metrics for monitoring

The connection manager handles critical functionality but lacks metrics exposure. Consider adding metrics for:

  • Connection state transitions
  • Heartbeat latency
  • Reconnection attempts
  • Dropped state changes
  • Message processing latency

This would help with monitoring and debugging in production environments.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between ff764c2 and 40931b6.

📒 Files selected for processing (4)
  • pkg/transport/bprotocol/compute/transport.go (2 hunks)
  • pkg/transport/nclprotocol/compute/manager.go (1 hunks)
  • pkg/transport/nclprotocol/orchestrator/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/orchestrator/manager.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/transport/bprotocol/compute/transport.go
  • pkg/transport/nclprotocol/orchestrator/dataplane.go
  • pkg/transport/nclprotocol/orchestrator/manager.go
🔇 Additional comments (1)
pkg/transport/nclprotocol/compute/manager.go (1)

452-461: ⚠️ Potential issue

Unresolved potential deadlock in state transitions

The current implementation still has potential for missed state changes when the channel is full. While logging the error is good, dropping state changes could lead to inconsistent state management.

Consider implementing one of these solutions:

  1. Use a larger buffer size based on expected peak load
  2. Implement a ring buffer with overflow handling
  3. Add metrics to track dropped state changes

pkg/transport/nclprotocol/compute/manager.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/compute/manager.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (9)
pkg/lib/validate/general.go (1)

14-19: Consider performance implications of reflection usage

While the enhanced nil checking is more comprehensive, the use of reflection might impact performance in hot paths. Consider:

  1. Caching the reflection results if the function is called frequently with the same types
  2. Adding a fast path for common cases before using reflection

Consider this optimization:

 func NotNil(value any, msg string, args ...any) error {
 	if value == nil {
 		return createError(msg, args...)
 	}
+
+	// Fast path for common types
+	switch v := value.(type) {
+	case *string, *int, *bool:
+		if v == nil {
+			return createError(msg, args...)
+		}
+		return nil
+	}
+
 	// Use reflection for other types
 	val := reflect.ValueOf(value)
 	switch val.Kind() {
 	case reflect.Ptr, reflect.Interface, reflect.Map, reflect.Slice, reflect.Func:
 		if val.IsNil() {
 			return createError(msg, args...)
 		}
 	default:
 	}
 	return nil
 }
pkg/lib/validate/general_test.go (2)

7-13: Consider adding exported type documentation

The new doer type and Doer interface should include documentation comments explaining their purpose in the test context.

Add documentation:

+// doer is a test implementation of the Doer interface
 type doer struct{}
 
 func (d doer) Do() {}
 
+// Doer is a test interface for validating interface nil checks
 type Doer interface {
 	Do()
 }

47-119: Consider adding edge case tests

The test coverage is comprehensive but could benefit from additional edge cases:

  1. Nested nil values (e.g., map of pointers)
  2. Custom types implementing interfaces
  3. Channel types

Would you like me to provide examples of these additional test cases?

pkg/orchestrator/watchers/ncl_message_creator_test.go (2)

60-122: LGTM! Comprehensive test coverage for NCLMessageCreator initialization.

Well-structured table-driven tests covering both valid and invalid scenarios. Consider adding a brief comment block explaining the test's purpose for better documentation.

Add a documentation comment:

+// TestNewNCLMessageCreator validates the initialization of NCLMessageCreator
+// with various parameter combinations, ensuring proper error handling for invalid inputs.
 func (s *NCLMessageCreatorTestSuite) TestNewNCLMessageCreator() {

124-133: Consider adding edge cases to factory tests.

While the basic factory test is good, consider adding test cases for:

  • Invalid node IDs
  • Canceled contexts
  • Factory initialization with nil parameters

Example addition:

 func (s *NCLMessageCreatorTestSuite) TestMessageCreatorFactory() {
+    tests := []struct {
+        name        string
+        nodeID      string
+        ctx         context.Context
+        expectError bool
+    }{
+        {
+            name:   "valid case",
+            nodeID: "test-node",
+            ctx:    context.Background(),
+        },
+        {
+            name:        "empty node ID",
+            nodeID:      "",
+            ctx:         context.Background(),
+            expectError: true,
+        },
+        {
+            name:        "canceled context",
+            nodeID:      "test-node",
+            ctx:         func() context.Context {
+                ctx, cancel := context.WithCancel(context.Background())
+                cancel()
+                return ctx
+            }(),
+            expectError: true,
+        },
+    }
+
+    for _, tc := range tests {
+        s.Run(tc.name, func() {
+            creator, err := factory.CreateMessageCreator(tc.ctx, tc.nodeID)
+            if tc.expectError {
+                s.Error(err)
+                s.Nil(creator)
+            } else {
+                s.NoError(err)
+                s.NotNil(creator)
+            }
+        })
+    }
 }
pkg/node/compute.go (4)

199-214: Consider validating heartbeat and node info update intervals

While the configuration is comprehensive, it's recommended to validate that the intervals are reasonable (not too short or too long) to prevent potential performance issues.

Consider adding validation like:

 connectionManager, err := transportcompute.NewConnectionManager(transportcompute.Config{
+    // Validate intervals
+    if cfg.BacalhauConfig.Compute.Heartbeat.Interval.AsTimeDuration() < time.Second {
+        return nil, fmt.Errorf("heartbeat interval must be at least 1 second")
+    }
+    if cfg.BacalhauConfig.Compute.Heartbeat.InfoUpdateInterval.AsTimeDuration() < time.Second {
+        return nil, fmt.Errorf("node info update interval must be at least 1 second")
+    }
     NodeID:                  cfg.NodeID,
     // ... rest of the config
 })

220-221: Enhance error context for connection manager startup

While the error is properly wrapped, consider providing more context about what specifically failed during startup.

-    if err = connectionManager.Start(ctx); err != nil {
-        return nil, fmt.Errorf("failed to start connection manager: %w", err)
+    if err = connectionManager.Start(ctx); err != nil {
+        return nil, fmt.Errorf("failed to start connection manager (node: %s): %w", cfg.NodeID, err)
     }

236-238: Enhance error handling in connection manager cleanup

Consider providing more context in the error message and potentially logging the node ID.

-    if err = connectionManager.Close(ctx); err != nil {
-        log.Error().Err(err).Msg("failed to stop connection manager")
+    if err = connectionManager.Close(ctx); err != nil {
+        log.Error().
+            Err(err).
+            Str("node_id", cfg.NodeID).
+            Msg("failed to stop connection manager during cleanup")
     }

Line range hint 340-367: Maintain consistency in error handling style

While error handling is present, consider maintaining consistency with the error wrapping style used elsewhere in the file.

-    return nil, fmt.Errorf("failed to setup execution logger watcher: %w", err)
+    return nil, bacerrors.Wrap(err, "failed to setup execution logger watcher")

-    return nil, fmt.Errorf("failed to setup execution handler watcher: %w", err)
+    return nil, bacerrors.Wrap(err, "failed to setup execution handler watcher")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 40931b6 and 4a523a8.

📒 Files selected for processing (11)
  • pkg/lib/validate/general.go (1 hunks)
  • pkg/lib/validate/general_test.go (2 hunks)
  • pkg/models/protocol.go (1 hunks)
  • pkg/node/compute.go (4 hunks)
  • pkg/orchestrator/watchers/ncl_message_creator.go (3 hunks)
  • pkg/orchestrator/watchers/ncl_message_creator_test.go (7 hunks)
  • pkg/orchestrator/watchers/protocol_router_test.go (0 hunks)
  • pkg/transport/bprotocol/compute/transport.go (2 hunks)
  • pkg/transport/nclprotocol/dispatcher/config_test.go (0 hunks)
  • pkg/transport/nclprotocol/orchestrator/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/types.go (1 hunks)
💤 Files with no reviewable changes (2)
  • pkg/orchestrator/watchers/protocol_router_test.go
  • pkg/transport/nclprotocol/dispatcher/config_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • pkg/transport/bprotocol/compute/transport.go
  • pkg/transport/nclprotocol/types.go
  • pkg/orchestrator/watchers/ncl_message_creator.go
  • pkg/transport/nclprotocol/orchestrator/dataplane.go
🔇 Additional comments (5)
pkg/models/protocol.go (1)

19-19: Verify the impact of protocol preference order change

The reordering of protocols with ProtocolBProtocolV2 after ProtocolNCLV1 represents a significant change in protocol selection behavior. This could affect existing clients and services.

Let's verify the protocol usage across the codebase:

✅ Verification successful

Let me gather more specific information about the protocol selection and usage patterns.


Based on the comprehensive results, I can now provide the final response:

Protocol order change is safe and backward compatible

The protocol preference order change is well-supported by the codebase's design:

  • The ProtocolRouter implementation dynamically negotiates protocols between nodes based on mutual support
  • Both protocols (NCLv1 and BProtocolV2) remain fully supported across the system
  • Tests verify protocol negotiation works correctly with different protocol combinations
  • Node registration and communication paths handle both protocols appropriately

The change only affects protocol selection when both protocols are available, and the system has proper fallback mechanisms in place.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check protocol usage and potential impacts

# Search for direct protocol references
echo "=== Direct Protocol References ==="
rg -l "ProtocolNCLV1|ProtocolBProtocolV2"

# Search for GetPreferredProtocol usage
echo "=== GetPreferredProtocol Usage ==="
ast-grep --pattern 'GetPreferredProtocol($$$)'

Length of output: 965


Script:

#!/bin/bash
# Check protocol selection logic and usage patterns

# Look for protocol selection/preference logic
echo "=== Protocol Selection Logic ==="
rg -A 5 "preferredProtocols" 

# Check protocol router implementation
echo "=== Protocol Router Implementation ==="
rg -A 10 "type ProtocolRouter" 

# Check test cases related to protocol selection
echo "=== Protocol Selection Tests ==="
rg -A 5 "TestGetPreferredProtocol|TestProtocolRouter" 

# Look for protocol version checks or compatibility code
echo "=== Protocol Version Checks ==="
rg -A 3 "SupportedProtocols|SupportsProtocol"

Length of output: 15024

pkg/orchestrator/watchers/ncl_message_creator_test.go (2)

48-53: LGTM! Well-structured error handling in test setup.

The addition of error handling for NewNCLMessageCreator initialization follows good testing practices.


144-148: LGTM! Consistent NodeID field additions across test cases.

The NodeID field has been consistently added across all test cases, maintaining test integrity and properly validating the new requirements.

Also applies to: 156-166, 173-173, 209-209, 242-242, 274-274, 305-305

pkg/node/compute.go (2)

31-32: LGTM: New imports align with transport layer changes

The added imports properly support the new NCL protocol transport layer implementation.


224-225: Verify removal of nclPublisher parameter impact

The removal of the nclPublisher parameter from setupComputeWatchers needs verification to ensure no functionality is lost.

✅ Verification successful

Based on the context gathered from the shell scripts, I can now generate the final response:

Removal of nclPublisher parameter is safe

The removal of the nclPublisher parameter from setupComputeWatchers appears to be part of a larger refactoring where publisher functionality is now handled through a more generic PublisherProvider interface. The code shows that publishers are properly initialized and passed through the compute stack via the configuration:

  • Publishers are obtained through DependencyInjector.PublishersFactory
  • The publisher functionality is properly maintained through the PublisherProvider
  • No direct usage of nclPublisher is found in the codebase
  • The watcher setup is focused on execution and bidding monitoring, not publishing
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining references to nclPublisher in watcher-related code
rg -l "nclPublisher" --type go | while read -r file; do
  echo "=== $file ==="
  rg "nclPublisher" "$file" -C 2
done

# Check for any potential message publishing in watcher implementations
ast-grep --pattern 'func $_($$_) {
  $$$
  Publish($$$)
  $$$
}'

Length of output: 112


Script:

#!/bin/bash
# Let's check the setupComputeWatchers function implementation and its history
ast-grep --pattern 'func setupComputeWatchers($$$) {
  $$$
}'

# Search for any watcher-related files and their content
fd -e go watcher | while read -r file; do
  echo "=== $file ==="
  cat "$file"
done

# Look for any publisher-related interfaces or types that might be relevant
rg -A 3 "type.*Publisher" --type go

Length of output: 66293


Script:

#!/bin/bash
# Let's check the setupComputeWatchers implementation and its usage
ast-grep --pattern 'func setupComputeWatchers($$$) {
  $$$
}'

# Search for any references to nclPublisher in compute-related code
rg -A 3 "nclPublisher" --type go

# Look for any publisher-related code in compute package
rg -A 3 "Publish" pkg/node/compute.go

Length of output: 1226


Script:

#!/bin/bash
# Let's check the history of the file to understand the changes
git log -p pkg/node/compute.go | grep -A 10 "setupComputeWatchers"

# Let's also check for any watcher-related tests that might show the usage
rg -A 5 "setupComputeWatchers" --type go

# Check for any publisher usage in watcher setup
fd -e go -x grep -l "setupComputeWatchers" {} \; -exec cat {} \;

Length of output: 802

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (5)
pkg/config/defaults.go (1)

50-51: LGTM! Consider documenting the migration path.

The consolidation of update intervals simplifies the configuration. The chosen intervals (1 minute for info updates, 15 seconds for heartbeat) are reasonable defaults.

Consider adding a comment in the code or updating the documentation to guide users migrating from the old configuration that used ResourceUpdateInterval.

pkg/transport/bprotocol/compute/management_client.go (1)

116-116: LGTM! Consider adding debug logging.

The consolidation of resource and heartbeat intervals simplifies the timing mechanism. The existing comment block explains the timing considerations well.

Consider adding debug logging when the resource ticker fires, similar to how other operations in this file are logged, to help with troubleshooting:

 case <-resourceTicker.C:
+    log.Ctx(ctx).Debug().Msgf("Resource update ticker fired with interval: %v", m.heartbeatConfig.Interval)
     // Send the latest resource info
     m.updateResources(ctx)
pkg/transport/nclprotocol/compute/config.go (2)

87-87: Document the rationale for HeartbeatMissFactor value

The magic number 5 for HeartbeatMissFactor would benefit from a comment explaining why this specific value was chosen and what trade-offs it represents.


88-91: Consider using named constants for timing configurations

The default timing values would be more maintainable as named constants, making it easier to understand and adjust them across the codebase.

+const (
+    defaultRequestTimeout      = 10 * time.Second
+    defaultReconnectInterval   = 10 * time.Second
+    defaultCheckpointInterval  = 30 * time.Second
+)

 func DefaultConfig() Config {
     return Config{
         HeartbeatMissFactor: 5,
-        RequestTimeout:      10 * time.Second,
-        ReconnectInterval:   10 * time.Second,
-        CheckpointInterval:  30 * time.Second,
+        RequestTimeout:      defaultRequestTimeout,
+        ReconnectInterval:   defaultReconnectInterval,
+        CheckpointInterval:  defaultCheckpointInterval,
pkg/transport/nclprotocol/compute/manager.go (1)

159-189: Optimize cleanup process

Consider implementing parallel cleanup with error aggregation for faster and more robust shutdown.

 func (cm *ConnectionManager) cleanup(ctx context.Context) {
+    var wg sync.WaitGroup
+    var mu sync.Mutex
+    var errors []error
+
+    // Helper to safely append errors
+    appendError := func(err error) {
+        mu.Lock()
+        errors = append(errors, err)
+        mu.Unlock()
+    }
+
     // Clean up data plane subscriber
     if cm.subscriber != nil {
-        if err := cm.subscriber.Close(ctx); err != nil {
-            log.Error().Err(err).Msg("Failed to close subscriber")
-        }
-        cm.subscriber = nil
+        wg.Add(1)
+        go func(s ncl.Subscriber) {
+            defer wg.Done()
+            if err := s.Close(ctx); err != nil {
+                appendError(fmt.Errorf("failed to close subscriber: %w", err))
+            }
+        }(cm.subscriber)
     }

     // Clean up data plane
     if cm.dataPlane != nil {
-        if err := cm.dataPlane.Stop(ctx); err != nil {
-            log.Error().Err(err).Msg("Failed to stop data plane")
-        }
-        cm.dataPlane = nil
+        wg.Add(1)
+        go func(dp *DataPlane) {
+            defer wg.Done()
+            if err := dp.Stop(ctx); err != nil {
+                appendError(fmt.Errorf("failed to stop data plane: %w", err))
+            }
+        }(cm.dataPlane)
     }

     // Clean up control plane
     if cm.controlPlane != nil {
-        if err := cm.controlPlane.Stop(ctx); err != nil {
-            log.Error().Err(err).Msg("Failed to stop control plane")
-        }
-        cm.controlPlane = nil
+        wg.Add(1)
+        go func(cp *ControlPlane) {
+            defer wg.Done()
+            if err := cp.Stop(ctx); err != nil {
+                appendError(fmt.Errorf("failed to stop control plane: %w", err))
+            }
+        }(cm.controlPlane)
     }

+    wg.Wait()
+
+    // Log aggregated errors
+    if len(errors) > 0 {
+        log.Error().Errs("errors", errors).Msg("Errors during cleanup")
+    }
+
+    // Clear references after parallel cleanup
+    cm.subscriber = nil
+    cm.dataPlane = nil
+    cm.controlPlane = nil
+
     // Clean up NATS connection last
     if cm.natsConn != nil {
         cm.natsConn.Close()
         cm.natsConn = nil
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 4a523a8 and 7da0a2e.

⛔ Files ignored due to path filters (1)
  • webui/lib/api/generated/types.gen.ts is excluded by !**/generated/**
📒 Files selected for processing (12)
  • .cspell/custom-dictionary.txt (1 hunks)
  • pkg/config/defaults.go (1 hunks)
  • pkg/config/migrate.go (1 hunks)
  • pkg/config/types/compute.go (1 hunks)
  • pkg/swagger/docs.go (1 hunks)
  • pkg/swagger/swagger.json (1 hunks)
  • pkg/test/compute/resourcelimits_test.go (1 hunks)
  • pkg/test/devstack/oversubscription_test.go (1 hunks)
  • pkg/transport/bprotocol/compute/management_client.go (1 hunks)
  • pkg/transport/nclprotocol/compute/config.go (1 hunks)
  • pkg/transport/nclprotocol/compute/manager.go (1 hunks)
  • webui/lib/api/schema/swagger.json (1 hunks)
✅ Files skipped from review due to trivial changes (3)
  • webui/lib/api/schema/swagger.json
  • .cspell/custom-dictionary.txt
  • pkg/swagger/docs.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/config/types/compute.go
🔇 Additional comments (7)
pkg/test/devstack/oversubscription_test.go (1)

70-70: LGTM! Test configuration updated correctly.

The test configuration has been properly updated to use the consolidated Interval field, maintaining the test's validity for verifying over-subscription behavior.

pkg/config/migrate.go (1)

41-42: Verify the heartbeat interval migration logic

The migration of heartbeat intervals appears correct, but we should ensure backward compatibility for systems still using the deprecated ResourceUpdateInterval.

✅ Verification successful

Let me gather more specific information about the usage of ResourceUpdateInterval to understand if there are any compatibility concerns.


Let me check one more thing to see how the heartbeat intervals are being used in the actual implementation.


Based on the comprehensive analysis of the codebase, I can now provide the final response:

Migration of heartbeat intervals is correctly implemented with proper backward compatibility

The codebase shows that:

  • The deprecated ResourceUpdateInterval is properly marked with a // Deprecated: use Interval instead comment
  • The migration logic correctly maps the new Interval and InfoUpdateInterval fields
  • The system continues to maintain the deprecated field while using the new fields in the actual implementation
  • Default values and validation logic properly handle both old and new interval configurations
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining usages of ResourceUpdateInterval in the codebase
# that might need to be updated.

# Search for ResourceUpdateInterval references
echo "Searching for ResourceUpdateInterval references..."
rg "ResourceUpdateInterval" -l

# Search for Heartbeat struct initialization
echo "Searching for Heartbeat struct initialization..."
ast-grep --pattern 'Heartbeat: types.Heartbeat{$$$}'

Length of output: 530


Script:

#!/bin/bash
# Check the actual usage context of ResourceUpdateInterval
echo "Checking ResourceUpdateInterval in compute.go..."
rg "ResourceUpdateInterval" pkg/config/types/compute.go -C 5

echo "\nChecking for any migration-related code handling ResourceUpdateInterval..."
rg "ResourceUpdateInterval" pkg/config/migrate.go -C 5

echo "\nChecking for any deprecated markers or comments..."
rg "ResourceUpdateInterval" -g "*.go" --with-filename --line-number

Length of output: 1668


Script:

#!/bin/bash
# Check for actual usage of heartbeat intervals in the code
echo "Checking for heartbeat interval usage..."
rg "Interval" -g "*.go" -C 5 | rg -A 5 -B 5 "Heartbeat|heartbeat"

# Check for any tests related to heartbeat intervals
echo "\nChecking for heartbeat-related tests..."
fd -e go -x rg -l "TestHeart|test.*Heartbeat"

Length of output: 29999

pkg/test/compute/resourcelimits_test.go (1)

308-308: LGTM! Test configuration updated correctly

The test configuration has been properly updated to use the new Interval field instead of the deprecated ResourceUpdateInterval.

pkg/swagger/swagger.json (1)

2559-2559: API documentation properly reflects deprecation

The Swagger documentation correctly marks ResourceUpdateInterval as deprecated and directs users to use Interval instead. This provides clear guidance for API consumers.

pkg/transport/nclprotocol/compute/manager.go (3)

401-435: LGTM: Race condition fixed

The implementation properly handles concurrent access to health metrics and connection state.


116-116: ⚠️ Potential issue

Replace context.TODO() with parent context

Using context.TODO() can lead to resource leaks as it won't be cancelled when the parent context is cancelled.


453-462: ⚠️ Potential issue

Implement retry mechanism for state change notifications

The current implementation logs and drops state changes when the channel is full. This could lead to inconsistent state tracking and missed notifications.

Consider implementing a retry mechanism:

-// Queue state change notification
-select {
-case cm.stateChanges <- stateChange{state: newState, err: err}:
-    log.Debug().
-        Str("oldState", oldState.String()).
-        Str("newState", newState.String()).
-        Err(err).
-        Msg("Connection state changed")
-default:
-    log.Error().Msg("State change notification channel full")
-}
+// Queue state change notification with retry
+const maxRetries = 3
+change := stateChange{state: newState, err: err}
+for i := 0; i < maxRetries; i++ {
+    select {
+    case cm.stateChanges <- change:
+        log.Debug().
+            Str("oldState", oldState.String()).
+            Str("newState", newState.String()).
+            Err(err).
+            Msg("Connection state changed")
+        return
+    case <-time.After(100 * time.Millisecond):
+        log.Warn().
+            Int("retry", i+1).
+            Msg("State change notification channel full, retrying")
+        continue
+    }
+}
+log.Error().Msg("Failed to queue state change after retries")

Likely invalid or redundant comment.

pkg/transport/nclprotocol/compute/config.go Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Outside diff range and nitpick comments (40)
pkg/test/utils/nats.go (5)

17-25: Add error handling and improve documentation.

While the implementation is correct, consider these improvements:

  1. Add error handling for RunServer as it could potentially fail.
  2. Enhance the documentation to explain the DefaultTestOptions being used.
-// startNatsOnPort will start a NATS server on a specific port and return a server and client instances
+// startNatsOnPort starts a NATS server on a specific port using default test options.
+// The server is configured with minimal settings suitable for testing.
 func startNatsOnPort(t *testing.T, port int) *natsserver.Server {
 	t.Helper()
 	opts := &natstest.DefaultTestOptions
 	opts.Port = port

-	natsServer := natstest.RunServer(opts)
+	natsServer := natstest.RunServer(opts)
+	require.NotNil(t, natsServer, "Failed to start NATS server")
 	return natsServer
 }

35-43: Consider extracting magic numbers into named constants.

While the implementation is solid, the timeout values would be more maintainable as named constants.

+const (
+	// Test-specific timeouts for NATS client
+	natsReconnectWait    = 200 * time.Millisecond
+	natsFlusherTimeout   = 100 * time.Millisecond
+)

 func CreateNatsClient(t *testing.T, url string) *nats.Conn {
 	nc, err := nats.Connect(url,
 		nats.ReconnectBufSize(-1),                 // disable reconnect buffer so client fails fast if disconnected
-		nats.ReconnectWait(200*time.Millisecond),  //nolint:mnd // reduce reconnect wait to fail fast in tests
-		nats.FlusherTimeout(100*time.Millisecond), //nolint:mnd // reduce flusher timeout to speed up tests
+		nats.ReconnectWait(natsReconnectWait),     // reduce reconnect wait to fail fast in tests
+		nats.FlusherTimeout(natsFlusherTimeout),   // reduce flusher timeout to speed up tests
 	)
 	require.NoError(t, err)
 	return nc
 }

45-49: Add validation for server's client URL.

Consider validating the server's client URL before creating the client connection.

 func StartNats(t *testing.T) (*natsserver.Server, *nats.Conn) {
 	natsServer := StartNatsServer(t)
+	clientURL := natsServer.ClientURL()
+	require.NotEmpty(t, clientURL, "NATS server client URL is empty")
-	return natsServer, CreateNatsClient(t, natsServer.ClientURL())
+	return natsServer, CreateNatsClient(t, clientURL)
 }

51-64: Consider adding shutdown verification.

While the implementation is solid, it would be beneficial to verify the server has completely shut down before restarting.

 func RestartNatsServer(t *testing.T, natsServer *natsserver.Server) (*natsserver.Server, *nats.Conn) {
 	t.Helper()
+	require.NotNil(t, natsServer, "NATS server is nil")
+
+	clientURL := natsServer.ClientURL()
+	require.NotEmpty(t, clientURL, "NATS server client URL is empty")
+
 	natsServer.Shutdown()
+	// Wait for the server to fully shutdown
+	time.Sleep(100 * time.Millisecond)
 
-	u, err := url.Parse(natsServer.ClientURL())
-	require.NoError(t, err, "Failed to parse NATS server URL %s", natsServer.ClientURL())
+	u, err := url.Parse(clientURL)
+	require.NoError(t, err, "Failed to parse NATS server URL %s", clientURL)

1-64: Consider adding integration tests for the NATS utilities.

These utilities are crucial for testing NATS-dependent components. Consider adding integration tests that verify these utilities work together correctly, especially in edge cases like network failures or server restarts.

Would you like me to help create integration tests for these utilities?

pkg/models/resource.go (1)

129-145: LGTM! Clean implementation of the Less method.

The implementation provides a clear and correct lexicographical comparison for GPU instances, following a well-defined order of fields.

Consider the following improvements:

  1. Add unit tests to verify sorting behavior with different GPU combinations
  2. Document the rationale behind the chosen field comparison order
  3. Consider implementing a corresponding Equal method for completeness

Example test cases to consider:

func TestGPU_Less(t *testing.T) {
    tests := []struct {
        name     string
        gpu1     GPU
        gpu2     GPU
        expected bool
    }{
        {
            name: "different indices",
            gpu1: GPU{Index: 1},
            gpu2: GPU{Index: 2},
            expected: true,
        },
        // Add more test cases covering each field
    }
    // ... test implementation
}
pkg/node/compute.go (5)

199-214: Consider adding parameter validation and improving configuration structure.

The connection manager configuration could benefit from:

  1. Input validation for intervals (ensure they're positive and reasonable)
  2. Separating LogStreamServer configuration into its own setup step

Consider refactoring like this:

 connectionManager, err := nclprotocolcompute.NewConnectionManager(nclprotocolcompute.Config{
     NodeID:                  cfg.NodeID,
     ClientFactory:           clientFactory,
     NodeInfoProvider:        nodeInfoProvider,
-    HeartbeatInterval:       cfg.BacalhauConfig.Compute.Heartbeat.Interval.AsTimeDuration(),
-    NodeInfoUpdateInterval:  cfg.BacalhauConfig.Compute.Heartbeat.InfoUpdateInterval.AsTimeDuration(),
+    HeartbeatInterval:       validateInterval(cfg.BacalhauConfig.Compute.Heartbeat.Interval.AsTimeDuration()),
+    NodeInfoUpdateInterval:  validateInterval(cfg.BacalhauConfig.Compute.Heartbeat.InfoUpdateInterval.AsTimeDuration()),
     DataPlaneMessageHandler: compute.NewMessageHandler(executionStore),
     DataPlaneMessageCreator: watchers.NewNCLMessageCreator(),
     EventStore:              executionStore.GetEventStore(),
     Checkpointer:            executionStore,
     DispatcherConfig:        dispatcher.DefaultConfig(),
-    LogStreamServer: logstream.NewServer(logstream.ServerParams{
-        ExecutionStore: executionStore,
-        Executors:      executors,
-    }),
 })

Add a helper function:

func validateInterval(d time.Duration) time.Duration {
    if d <= 0 {
        return defaultInterval // Define appropriate default
    }
    return d
}

220-221: Enhance error context for connection manager startup failure.

The error wrapping could provide more specific details about what aspect of the startup failed.

 if err = connectionManager.Start(ctx); err != nil {
-    return nil, fmt.Errorf("failed to start connection manager: %w", err)
+    return nil, fmt.Errorf("failed to start NCL protocol connection manager (node: %s): %w", cfg.NodeID, err)
 }

Line range hint 340-367: Maintain consistent error wrapping pattern.

The error wrapping pattern varies between different error cases in the watcher setup. Consider using consistent error wrapping with bacerrors.Wrap as used elsewhere in the codebase.

 ) (watcher.Manager, error) {
     watcherRegistry := watcher.NewManager(executionStore.GetEventStore())
 
     _, err := watcherRegistry.Create(ctx, computeExecutionLoggerWatcherID,
         watcher.WithHandler(watchers.NewExecutionLogger(log.Logger)),
         watcher.WithEphemeral(),
         watcher.WithAutoStart(),
         watcher.WithInitialEventIterator(watcher.LatestIterator()))
     if err != nil {
-        return nil, fmt.Errorf("failed to setup execution logger watcher: %w", err)
+        return nil, bacerrors.Wrap(err, "failed to setup execution logger watcher")
     }
 
     // Set up execution handler watcher
     _, err = watcherRegistry.Create(ctx, computeExecutionHandlerWatcherID,
         watcher.WithHandler(watchers.NewExecutionUpsertHandler(bufferRunner, bidder)),
         watcher.WithAutoStart(),
         watcher.WithFilter(watcher.EventFilter{
             ObjectTypes: []string{compute.EventObjectExecutionUpsert},
         }),
         watcher.WithRetryStrategy(watcher.RetryStrategySkip),
         watcher.WithMaxRetries(3),
         watcher.WithInitialEventIterator(watcher.LatestIterator()))
     if err != nil {
-        return nil, fmt.Errorf("failed to setup execution handler watcher: %w", err)
+        return nil, bacerrors.Wrap(err, "failed to setup execution handler watcher")
     }
 
     return watcherRegistry, nil
 }

236-238: Consider grouping related cleanup operations.

The cleanup operations could be better organized by grouping related operations and using a helper function to handle error logging consistently.

+func logCleanupError(operation string, err error) {
+    if err != nil {
+        log.Error().Err(err).Msgf("failed to %s during cleanup", operation)
+    }
+}

 cleanupFunc := func(ctx context.Context) {
-    if err = watcherRegistry.Stop(ctx); err != nil {
-        log.Error().Err(err).Msg("failed to stop watcher registry")
-    }
-    legacyConnectionManager.Stop(ctx)
-    if err = connectionManager.Close(ctx); err != nil {
-        log.Error().Err(err).Msg("failed to stop connection manager")
-    }
+    // Stop watchers and connection managers
+    logCleanupError("stop watcher registry", watcherRegistry.Stop(ctx))
+    legacyConnectionManager.Stop(ctx)
+    logCleanupError("stop connection manager", connectionManager.Close(ctx))
+
+    // Cleanup storage and resources
+    logCleanupError("close execution store", executionStore.Close(ctx))
+    logCleanupError("close results path", resultsPath.Close())
 }

199-214: Consider documenting the transition strategy for connection managers.

The code currently maintains both legacy and new NCL protocol connection managers. It would be beneficial to:

  1. Document the transition timeline
  2. Add TODO comments indicating when the legacy connection manager can be removed
  3. Consider feature flags to control the rollout

Add documentation comments:

+    // TODO(transition): Legacy connection manager is maintained for backward compatibility
+    // and should be removed once all nodes are upgraded to use the NCL protocol (target: Q2 2024)
     legacyConnectionManager, err := bprotocolcompute.NewConnectionManager(...)

+    // New NCL protocol connection manager that will eventually replace the legacy implementation
     connectionManager, err := nclprotocolcompute.NewConnectionManager(...)

Also applies to: 220-221

pkg/models/buildversion.go (1)

18-25: LGTM! Consider adding documentation.

The Copy implementation is correct as it properly handles nil checks and copying of primitive types.

Consider adding a doc comment to describe the method:

+// Copy returns a deep copy of BuildVersionInfo.
+// Returns nil if the receiver is nil.
 func (b *BuildVersionInfo) Copy() *BuildVersionInfo {
pkg/models/node_info_test.go (1)

19-322: LGTM! Consider adding nil field test cases.

The test suite is comprehensive and well-structured. However, consider adding test cases for:

  • Nil ComputeNodeInfo
  • Nil BacalhauVersion
  • Nil Resources fields in ComputeNodeInfo

Example additional test case:

{
    name: "nil compute node info",
    changeFunction: func(info *NodeInfo) *NodeInfo {
        info = info.Copy()
        info.ComputeNodeInfo = ComputeNodeInfo{} // or nil if the type is a pointer
        return info
    },
    expectChanged: true,
},
go.mod (1)

Line range hint 3-3: Invalid Go version specified

The specified Go version 1.23 does not exist. The latest stable version is Go 1.22.

Apply this change:

-go 1.23
+go 1.22
pkg/test/utils/watcher.go (2)

19-21: Consider a more descriptive constant name.

The constant TypeString could be more descriptive to indicate its purpose in event store type registration, such as StringEventType or StringSerializationType.

const (
-	TypeString = "string"
+	StringEventType = "string"
)

23-40: Add function documentation and consider extracting bucket names.

The function implementation is solid, but could benefit from:

  1. Adding godoc documentation explaining its purpose and return value
  2. Extracting bucket names as constants to avoid string duplication across functions
+// CreateComputeEventStore creates a new event store for compute-related events using BoltDB.
+// It registers necessary types for execution upserts and events.
+// The returned EventStore must be closed by the caller when no longer needed.
 func CreateComputeEventStore(t *testing.T) watcher.EventStore {

Consider extracting bucket names:

+const (
+	eventsBucketName     = "events"
+	checkpointBucketName = "checkpoints"
+)
pkg/transport/nclprotocol/test/message_handling.go (2)

13-18: Consider adding SetShouldProcess method for test configuration

The shouldProcess field can only be set during initialization. Adding a setter method would improve test flexibility.

+// SetShouldProcess configures whether messages should be processed
+func (m *MockMessageHandler) SetShouldProcess(should bool) {
+    m.mu.Lock()
+    m.shouldProcess = should
+    m.mu.Unlock()
+}

47-53: Consider adding method to clear received messages

For test cleanup and state reset scenarios, it would be helpful to have a method to clear the message history.

+// ClearMessages removes all stored messages
+func (m *MockMessageHandler) ClearMessages() {
+    m.mu.Lock()
+    m.messages = make([]envelope.Message, 0)
+    m.mu.Unlock()
+}
pkg/transport/nclprotocol/compute/health_tracker.go (2)

41-49: Consider adding maximum failure threshold

The ConsecutiveFailures counter increments without limit. Consider adding a maximum threshold and corresponding behavior.

+const maxConsecutiveFailures = 3
+
 func (ht *HealthTracker) MarkDisconnected(err error) {
     ht.mu.Lock()
     defer ht.mu.Unlock()
 
     ht.health.CurrentState = nclprotocol.Disconnected
     ht.health.LastError = err
     ht.health.ConsecutiveFailures++
+    if ht.health.ConsecutiveFailures >= maxConsecutiveFailures {
+        // Consider triggering reconnection logic or alerting
+    }
 }

66-71: Add error tracking for update failures

While success is tracked for updates, there's no tracking of update failures. Consider adding error tracking for symmetry.

+// UpdateFailed records failed node info update
+func (ht *HealthTracker) UpdateFailed(err error) {
+    ht.mu.Lock()
+    defer ht.mu.Unlock()
+    ht.health.LastUpdateError = err
+}
pkg/transport/nclprotocol/test/message_creation.go (1)

25-36: Consider adding message validation in CreateMessage

The default message creation could benefit from validation to ensure required metadata is present.

 func (c *MockMessageCreator) CreateMessage(event watcher.Event) (*envelope.Message, error) {
     if c.Error != nil {
         return nil, c.Error
     }
     if c.Message != nil {
+        if c.Message.Metadata == nil || c.Message.Metadata[envelope.KeyMessageType] == "" {
+            return nil, fmt.Errorf("mock message missing required metadata")
+        }
         return c.Message, nil
     }
     // Return default message if no specific behavior configured
     return envelope.NewMessage(messages.BidResult{}).
         WithMetadataValue(envelope.KeyMessageType, messages.BidResultMessageType), nil
 }
pkg/transport/nclprotocol/compute/health_tracker_test.go (3)

31-42: Consider using table-driven tests for field validations.

While the test is thorough, consider refactoring to a table-driven approach for better maintainability and clarity:

 func (s *HealthTrackerTestSuite) TestInitialState() {
 	startTime := s.clock.Now()
 	health := s.tracker.GetHealth()
 
-	s.Require().Equal(nclprotocol.Disconnected, health.CurrentState)
-	s.Require().Equal(startTime, health.StartTime)
-	s.Require().True(health.LastSuccessfulHeartbeat.IsZero())
-	s.Require().True(health.LastSuccessfulUpdate.IsZero())
-	s.Require().Equal(0, health.ConsecutiveFailures)
-	s.Require().Nil(health.LastError)
-	s.Require().True(health.ConnectedSince.IsZero())
+	tests := []struct {
+		name     string
+		actual   interface{}
+		expected interface{}
+	}{
+		{"CurrentState", health.CurrentState, nclprotocol.Disconnected},
+		{"StartTime", health.StartTime, startTime},
+		{"LastSuccessfulHeartbeat", health.LastSuccessfulHeartbeat.IsZero(), true},
+		{"LastSuccessfulUpdate", health.LastSuccessfulUpdate.IsZero(), true},
+		{"ConsecutiveFailures", health.ConsecutiveFailures, 0},
+		{"LastError", health.LastError, nil},
+		{"ConnectedSince", health.ConnectedSince.IsZero(), true},
+	}
+
+	for _, tt := range tests {
+		s.Run(tt.name, func() {
+			s.Require().Equal(tt.expected, tt.actual)
+		})
+	}
 }

44-76: Add documentation for test scenarios.

While the tests are well-structured, adding documentation would improve clarity about the test scenarios and expected behaviors.

+// TestMarkConnected verifies that:
+// 1. The connection state is properly updated
+// 2. Timestamps are correctly set
+// 3. Error state is cleared
 func (s *HealthTrackerTestSuite) TestMarkConnected() {
     // ... existing code ...
 }

+// TestMarkDisconnected verifies that:
+// 1. The connection state changes to disconnected
+// 2. Error information is properly stored
+// 3. Consecutive failures are tracked correctly
 func (s *HealthTrackerTestSuite) TestMarkDisconnected() {
     // ... existing code ...
 }

78-97: Add edge cases to TestSuccessfulOperations.

Consider adding test cases for important edge scenarios:

 func (s *HealthTrackerTestSuite) TestSuccessfulOperations() {
     // ... existing code ...
+
+    // Test rapid consecutive updates
+    for i := 0; i < 3; i++ {
+        s.clock.Add(time.Millisecond)
+        s.tracker.HeartbeatSuccess()
+    }
+    health = s.tracker.GetHealth()
+    s.Require().Equal(s.clock.Now(), health.LastSuccessfulHeartbeat)
+
+    // Test updates while disconnected
+    s.tracker.MarkDisconnected(fmt.Errorf("test error"))
+    s.clock.Add(time.Second)
+    s.tracker.UpdateSuccess()
+    health = s.tracker.GetHealth()
+    s.Require().Equal(s.clock.Now(), health.LastSuccessfulUpdate)
 }
pkg/transport/nclprotocol/compute/config.go (3)

21-49: Enhance struct documentation with field descriptions.

While the struct is well-organized, adding detailed field descriptions would improve maintainability.

 type Config struct {
+    // NodeID uniquely identifies this compute node in the network
     NodeID           string
+    // ClientFactory creates new NATS client connections
     ClientFactory    nats.ClientFactory
+    // NodeInfoProvider supplies information about the compute node's capabilities and state
     NodeInfoProvider models.NodeInfoProvider
     // ... add similar documentation for other fields
 }

80-97: Document rationale behind default values.

While the default values are reasonable, documenting the reasoning behind these choices would help future maintainers understand the implications of changing them.

 func DefaultConfig() Config {
-    // defaults for heartbeatInterval and nodeInfoUpdateInterval are provided by BacalhauConfig,
-    // and equal to 15 seconds and 1 minute respectively
+    // Default values are chosen based on the following considerations:
+    // - HeartbeatMissFactor: 5 missed heartbeats provide a balance between quick failure detection
+    //   and tolerance for network jitter
+    // - RequestTimeout: 10s allows for network latency while preventing hung operations
+    // - ReconnectInterval: 10s provides reasonable time for transient issues to resolve
+    // - CheckpointInterval: 30s balances resource usage with data consistency needs
     return Config{
         // ... existing code ...
     }
 }

99-128: Add error handling for invalid states.

Consider adding validation to prevent setting defaults on partially initialized configs that might lead to invalid states.

 func (c *Config) SetDefaults() {
+    // Validate that required fields are set before applying defaults
+    if c.NodeID == "" {
+        panic("NodeID must be set before applying defaults")
+    }
     defaults := DefaultConfig()
     // ... existing code ...
 }
pkg/transport/nclprotocol/compute/config_test.go (3)

49-73: Add helper methods for common test scenarios.

Consider extracting common configuration modifications into helper methods to improve test readability and maintainability.

+func (s *ConfigTestSuite) getConfigWithInvalidInterval() nclprotocolcompute.Config {
+    cfg := s.getValidConfig()
+    cfg.HeartbeatInterval = 0
+    return cfg
+}
+
+func (s *ConfigTestSuite) getConfigWithMissingDependencies() nclprotocolcompute.Config {
+    cfg := s.getValidConfig()
+    cfg.ClientFactory = nil
+    cfg.NodeInfoProvider = nil
+    return cfg
+}

75-128: Add edge cases to validation tests.

Consider adding test cases for additional scenarios:

     testCases := []struct {
         // ... existing code ...
     }{
         // ... existing cases ...
+        {
+            name: "zero heartbeat miss factor",
+            mutate: func(c *nclprotocolcompute.Config) {
+                c.HeartbeatMissFactor = 0
+            },
+            expectError: "heartbeat miss factor must be positive",
+        },
+        {
+            name: "negative intervals",
+            mutate: func(c *nclprotocolcompute.Config) {
+                c.RequestTimeout = -1 * time.Second
+            },
+            expectError: "must be positive",
+        },
     }

130-149: Enhance default value testing.

Consider adding more comprehensive verifications:

 func (s *ConfigTestSuite) TestSetDefaults() {
     // ... existing code ...
+    
+    // Verify all time-based defaults
+    s.Equal(defaults.ReconnectInterval, emptyConfig.ReconnectInterval)
+    s.Equal(defaults.CheckpointInterval, emptyConfig.CheckpointInterval)
+    
+    // Verify proper initialization of complex types
+    s.NotNil(emptyConfig.Clock)
+    s.IsType(&envelope.Serializer{}, emptyConfig.MessageSerializer)
+    
+    // Verify that partially initialized configs are handled correctly
+    partialConfig := nclprotocolcompute.Config{
+        MessageSerializer: envelope.NewSerializer(),
+        Clock: clock.New(),
+    }
+    partialConfig.SetDefaults()
+    s.Equal(defaults.HeartbeatMissFactor, partialConfig.HeartbeatMissFactor)
 }
pkg/transport/nclprotocol/compute/dataplane.go (1)

171-196: Add context timeout handling in cleanup

The cleanup method should respect context cancellation to prevent hanging during shutdown.

 func (dp *DataPlane) cleanup(ctx context.Context) error {
+	if ctx.Err() != nil {
+		return fmt.Errorf("context error during cleanup: %w", ctx.Err())
+	}
 	var errs error
 
 	// Stop dispatcher first to prevent new messages
 	if dp.Dispatcher != nil {
 		if err := dp.Dispatcher.Stop(ctx); err != nil {
 			errs = errors.Join(errs, err)
 		}
 		dp.Dispatcher = nil
 	}
 
 	// Then close the publisher
 	if dp.Publisher != nil {
 		if err := dp.Publisher.Close(ctx); err != nil {
 			errs = errors.Join(errs, err)
 		}
 		dp.Publisher = nil
 	}
 
 	if errs != nil {
 		return fmt.Errorf("failed to cleanup data plane: %w", errs)
 	}
 	return nil
 }
pkg/transport/nclprotocol/orchestrator/dataplane_test.go (2)

173-210: Add timeout context in TestIncomingMessageProcessing

The test should use a timeout context to prevent hanging in case of test failures.

 func (s *DataPlaneTestSuite) TestIncomingMessageProcessing() {
+	ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
+	defer cancel()
-	s.Require().NoError(s.dataPlane.Start(s.ctx))
+	s.Require().NoError(s.dataPlane.Start(ctx))

243-253: Add assertions for checkpoint error cases

The test should verify behavior when checkpointing fails.

 	s.Require().NoError(controlPlane.Start(s.ctx))
 	// Wait for checkpoint to be called
 	s.Eventually(func() bool {
 		return checkpointCalled
 	}, 100*time.Millisecond, 10*time.Millisecond)
 
 	// Verify checkpoint was stored
 	value, err := s.checkpointer.GetStoredCheckpoint("test-checkpoint")
 	s.Require().NoError(err)
 	s.Equal(uint64(42), value)
+
+	// Test checkpoint failure
+	s.checkpointer.OnCheckpointSet(func(name string, value uint64) {
+		s.checkpointer.SetError(fmt.Errorf("checkpoint error"))
+	})
+	s.seqTracker.UpdateLastSeqNum(43)
+	time.Sleep(100 * time.Millisecond)
+	value, err = s.checkpointer.GetStoredCheckpoint("test-checkpoint")
+	s.Require().NoError(err)
+	s.Equal(uint64(42), value) // Should still have old value
pkg/transport/nclprotocol/compute/controlplane_test.go (2)

255-277: Enhance error handling test coverage

The error handling test should verify multiple failure scenarios and recovery.

 func (s *ControlPlaneTestSuite) TestErrorHandling() {
 	// Create control plane with only heartbeat enabled
 	controlPlane := s.createControlPlane(
 		50*time.Millisecond, // heartbeat
 		1*time.Hour,         // node info - disabled
 		1*time.Hour,         // checkpoint - disabled
 	)
 	defer s.Require().NoError(controlPlane.Stop(s.ctx))
 
-	// Setup error response
+	// Setup multiple error responses followed by success
 	s.requester.EXPECT().
 		Request(gomock.Any(), gomock.Any()).
 		Return(nil, fmt.Errorf("network error")).
-		Times(1)
+		Times(2)
+	s.requester.EXPECT().
+		Request(gomock.Any(), gomock.Any()).
+		Return(envelope.NewMessage(messages.HeartbeatResponse{}), nil).
+		Times(1)
 
 	// Start control plane
 	s.Require().NoError(controlPlane.Start(s.ctx))
-	time.Sleep(70 * time.Millisecond)
+	time.Sleep(150 * time.Millisecond)
 
 	// Verify health tracker reflects failure
 	health := s.healthTracker.GetHealth()
-	s.Require().Zero(health.LastSuccessfulHeartbeat)
+	s.Require().NotZero(health.LastSuccessfulHeartbeat)
+	s.Require().Equal(2, health.ConsecutiveFailures)
 }

41-66: Consider parameterizing test intervals

The test intervals are hardcoded. Consider making them configurable for better test flexibility.

+const (
+	defaultHeartbeatInterval      = 50 * time.Millisecond
+	defaultNodeInfoUpdateInterval = 100 * time.Millisecond
+	defaultCheckpointInterval     = 150 * time.Millisecond
+	defaultRequestTimeout         = 50 * time.Millisecond
+)
+
 func (s *ControlPlaneTestSuite) SetupTest() {
 	s.ctrl = gomock.NewController(s.T())
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.clock = clock.New() // tickers didn't work properly with mock clock
 
 	// Create mocks
 	s.requester = ncl.NewMockPublisher(s.ctrl)
 	s.nodeInfoProvider = ncltest.NewMockNodeInfoProvider()
 	s.checkpointer = ncltest.NewMockCheckpointer()
 
 	// Create real components
 	s.healthTracker = nclprotocolcompute.NewHealthTracker(s.clock)
 	s.seqTracker = nclprotocol.NewSequenceTracker()
 
 	// Setup basic config with short intervals for testing
 	s.config = nclprotocolcompute.Config{
 		NodeID:                 "test-node",
 		NodeInfoProvider:       s.nodeInfoProvider,
 		Checkpointer:           s.checkpointer,
-		HeartbeatInterval:      50 * time.Millisecond,
-		NodeInfoUpdateInterval: 100 * time.Millisecond,
-		CheckpointInterval:     150 * time.Millisecond,
-		RequestTimeout:         50 * time.Millisecond,
+		HeartbeatInterval:      defaultHeartbeatInterval,
+		NodeInfoUpdateInterval: defaultNodeInfoUpdateInterval,
+		CheckpointInterval:     defaultCheckpointInterval,
+		RequestTimeout:         defaultRequestTimeout,
 		Clock:                  s.clock,
 	}
pkg/transport/nclprotocol/compute/manager_test.go (5)

74-80: Consider making time-based configuration values more configurable

The hardcoded time intervals could make tests brittle or slow in certain environments. Consider making these configurable or relative to a base test timeout value.

+const (
+    baseTestInterval = 100 * time.Millisecond
+    baseTestTimeout  = 1 * time.Second
+)
 
 		Clock:                  s.clock,
-		HeartbeatInterval:      100 * time.Millisecond,
+		HeartbeatInterval:      baseTestInterval,
 		HeartbeatMissFactor:    3,
-		NodeInfoUpdateInterval: 100 * time.Millisecond,
-		CheckpointInterval:     1 * time.Second,
-		ReconnectInterval:      100 * time.Millisecond,
+		NodeInfoUpdateInterval: baseTestInterval,
+		CheckpointInterval:     baseTestTimeout,
+		ReconnectInterval:      baseTestInterval,

57-59: Consider adding mock validation in setup

The mock objects are created but their initial state isn't validated. Consider adding assertions to ensure mocks are in the expected initial state.

 	s.nodeInfoProvider = ncltest.NewMockNodeInfoProvider()
+	s.Require().NotNil(s.nodeInfoProvider.GetNodeInfo(s.ctx), "Initial node info should not be nil")
 	s.messageHandler = ncltest.NewMockMessageHandler()
+	s.Require().Zero(s.messageHandler.MessageCount(), "Initial message count should be zero")
 	s.checkpointer = ncltest.NewMockCheckpointer()
+	s.Require().Zero(s.checkpointer.CheckpointCount(), "Initial checkpoint count should be zero")

95-111: Consider adding timeout context for cleanup operations

The cleanup operations could hang indefinitely. Consider using a timeout context for Close operations.

 func (s *ConnectionManagerTestSuite) TearDownTest() {
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
 	if s.manager != nil {
-		s.Require().NoError(s.manager.Close(context.Background()))
+		s.Require().NoError(s.manager.Close(ctx))
 	}
 	if s.mockResponder != nil {
-		s.Require().NoError(s.mockResponder.Close(context.Background()))
+		s.Require().NoError(s.mockResponder.Close(ctx))
 	}

121-123: Consider adding timeout constants for Eventually assertions

The timeout and polling intervals in Eventually assertions are hardcoded. Consider extracting these to named constants for better maintainability.

+const (
+    eventuallyTimeout  = time.Second
+    eventuallyInterval = 10 * time.Millisecond
+)
+
-	}, time.Second, 10*time.Millisecond, "handshake not received")
+	}, eventuallyTimeout, eventuallyInterval, "handshake not received")

Also applies to: 132-135, 145-147


213-235: Enhance TestHeartbeatFailure with additional assertions

The test could benefit from additional assertions to verify the exact nature of the failure and the state transitions.

 	time.Sleep(s.config.HeartbeatInterval * time.Duration(s.config.HeartbeatMissFactor+1))
 
+	// Verify intermediate state
+	health := s.manager.GetHealth()
+	s.Require().Greater(health.ConsecutiveFailures, uint64(0), "Should have recorded failures")
+	s.Require().Contains(health.LastError.Error(), "heartbeat failed", "Should contain original error")
+
 	// Should disconnect after missing heartbeats
 	s.Require().Eventually(func() bool {
 		health := s.manager.GetHealth()
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 7da0a2e and 8c56ee7.

📒 Files selected for processing (28)
  • .cspell/custom-dictionary.txt (2 hunks)
  • go.mod (1 hunks)
  • pkg/models/buildversion.go (1 hunks)
  • pkg/models/node_info.go (3 hunks)
  • pkg/models/node_info_test.go (1 hunks)
  • pkg/models/resource.go (1 hunks)
  • pkg/node/compute.go (4 hunks)
  • pkg/test/utils/nats.go (1 hunks)
  • pkg/test/utils/utils.go (0 hunks)
  • pkg/test/utils/watcher.go (1 hunks)
  • pkg/transport/nclprotocol/compute/config.go (1 hunks)
  • pkg/transport/nclprotocol/compute/config_test.go (1 hunks)
  • pkg/transport/nclprotocol/compute/controlplane.go (1 hunks)
  • pkg/transport/nclprotocol/compute/controlplane_test.go (1 hunks)
  • pkg/transport/nclprotocol/compute/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/compute/dataplane_test.go (1 hunks)
  • pkg/transport/nclprotocol/compute/health_tracker.go (1 hunks)
  • pkg/transport/nclprotocol/compute/health_tracker_test.go (1 hunks)
  • pkg/transport/nclprotocol/compute/manager.go (1 hunks)
  • pkg/transport/nclprotocol/compute/manager_test.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (3 hunks)
  • pkg/transport/nclprotocol/mocks.go (1 hunks)
  • pkg/transport/nclprotocol/orchestrator/dataplane_test.go (1 hunks)
  • pkg/transport/nclprotocol/test/control_plane.go (1 hunks)
  • pkg/transport/nclprotocol/test/message_creation.go (1 hunks)
  • pkg/transport/nclprotocol/test/message_handling.go (1 hunks)
  • pkg/transport/nclprotocol/test/nodes.go (1 hunks)
  • pkg/transport/nclprotocol/test/utils.go (1 hunks)
💤 Files with no reviewable changes (1)
  • pkg/test/utils/utils.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go
  • pkg/transport/nclprotocol/compute/controlplane.go
  • pkg/transport/nclprotocol/compute/manager.go
🔇 Additional comments (20)
.cspell/custom-dictionary.txt (1)

23-23: Verify the coexistence of old and new protocol terms.

The dictionary includes terms for both the old protocol (bprotocolcompute, bprotocolorchestrator) and the new NCL protocol (nclprotocol, nclprotocolcompute). Please confirm if maintaining both sets of terms is intentional during the transition period.

Also applies to: 445-447

✅ Verification successful

Both old and new protocol implementations are actively used in the codebase

The coexistence of both protocol terms in the dictionary is intentional and necessary:

  • The old protocol (bprotocol) is actively used in multiple components:

    • Core transport implementation: pkg/transport/bprotocol/
    • Compute and orchestrator specific code
    • Watchers and dispatchers in both compute and orchestrator packages
  • The new NCL protocol (nclprotocol) is also extensively implemented:

    • Core transport implementation: pkg/transport/nclprotocol/
    • Compute and orchestrator components
    • Test utilities and mocks

Some files like pkg/node/requester.go and pkg/node/compute.go use both protocols, suggesting a controlled transition or parallel support strategy.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if old protocol terms are still in use

# Search for old protocol usage
echo "Checking usage of old protocol terms..."
rg -l "bprotocol" --type go

# Search for new protocol usage
echo "Checking usage of new protocol terms..."
rg -l "nclprotocol" --type go

Length of output: 2383

pkg/transport/nclprotocol/mocks.go (4)

1-14: LGTM! Note: This is an auto-generated file.

The file header and imports are correctly structured. As this is an auto-generated file by MockGen, manual modifications should be avoided.


16-66: Verify the MockCheckpointer interface implementation completeness.

The MockCheckpointer implementation looks correct and follows gomock's patterns.

✅ Verification successful

MockCheckpointer correctly implements all methods of the Checkpointer interface

The verification confirms that the mock implementation matches the interface definition exactly:

  • Checkpoint(ctx context.Context, name string, sequenceNumber uint64) error
  • GetCheckpoint(ctx context.Context, name string) (uint64, error)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that MockCheckpointer implements all methods of the Checkpointer interface
# and that the interface hasn't changed.

# Search for the Checkpointer interface definition
ast-grep --pattern 'type Checkpointer interface {
  $$$
}'

Length of output: 384


106-142: Verify the MockMessageCreatorFactory interface implementation completeness.

The MockMessageCreatorFactory implementation looks correct and follows gomock's patterns.

✅ Verification successful

The MockMessageCreatorFactory implementation is complete and correct

The interface verification confirms that MessageCreatorFactory only requires the implementation of a single method CreateMessageCreator(ctx context.Context, nodeID string) (MessageCreator, error), which is properly implemented in the mock.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that MockMessageCreatorFactory implements all methods of the MessageCreatorFactory interface
# and that the interface hasn't changed.

# Search for the MessageCreatorFactory interface definition
ast-grep --pattern 'type MessageCreatorFactory interface {
  $$$
}'

Length of output: 306


68-104: Verify the MockMessageCreator interface implementation completeness.

The MockMessageCreator implementation looks correct and follows gomock's patterns.

✅ Verification successful

The MockMessageCreator implementation correctly matches the interface

The verification confirms that the MessageCreator interface only requires a single method CreateMessage(event watcher.Event) (*envelope.Message, error), which is fully implemented in the mock with matching signature and return types.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that MockMessageCreator implements all methods of the MessageCreator interface
# and that the interface hasn't changed.

# Search for the MessageCreator interface definition
ast-grep --pattern 'type MessageCreator interface {
  $$$
}'

Length of output: 563

pkg/test/utils/nats.go (2)

3-15: LGTM! Well-organized imports.

The imports are properly organized with standard library packages first, followed by third-party dependencies, and then internal packages.


27-33: LGTM! Well-implemented server initialization.

The function properly handles port allocation and error cases, making it reliable for test scenarios.

pkg/models/resource.go (1)

129-131: Verify the sorting implementation's usage in the transport layer.

Since this is part of a transport layer refactoring, let's verify how this sorting implementation is used and ensure it meets the requirements.

✅ Verification successful

GPU sorting implementation is correctly used for stable comparisons

The Less method is used appropriately in the codebase:

  • It's primarily used in NodeInfo.HasStaticConfigChanged() with cmpopts.SortSlices to ensure stable comparisons of GPU arrays
  • The implementation follows a clear, deterministic ordering (Index, Name, Vendor, Memory, PCIAddress)
  • The sorting is used for comparison stability, not for transport layer ordering requirements

The verification shows this is a proper implementation for its intended use case of providing stable comparisons between NodeInfo instances, regardless of the original GPU array ordering.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for GPU sorting usage in the codebase

# Look for slice operations on GPU types that might use sorting
echo "Checking for GPU slice operations..."
rg -l "sort\..*\[\]GPU" 

# Look for direct usage of the Less method
echo "Checking for Less method usage..."
rg "\.Less\(" --type go

# Look for transport layer code that might use GPU ordering
echo "Checking transport layer usage..."
rg "transport.*GPU" --type go

Length of output: 1008


Script:

#!/bin/bash
# Let's get more context about the GPU sorting implementation and its usage

# Check the full implementation of GPU struct and Less method
echo "Checking GPU struct and Less implementation..."
ast-grep --pattern 'type GPU struct {
  $$$
}

func (g GPU) Less($_) bool {
  $$$
}'

# Check for Sort or other slice operations on GPU types
echo "Checking for GPU slice operations with context..."
rg "\[\]GPU" -B 2 -A 2

# Check the node_info.go file where Less is used
echo "Content of relevant section in node_info.go..."
rg -B 5 -A 5 "cmpopts.SortSlices.*GPU.*Less" pkg/models/node_info.go

Length of output: 3357


Script:

#!/bin/bash
# Let's get the full implementation details of the Less method and its usage context

# Get the full GPU struct and Less method implementation
echo "GPU struct and Less implementation from resource.go..."
rg -B 10 -A 15 "func \(g GPU\) Less" pkg/models/resource.go

# Check Equal implementation which might be related
echo "Checking if there's an Equal method for GPU..."
ast-grep --pattern 'func (g GPU) Equal($_) bool {
  $$$
}'

# Get the full context of the comparison usage in node_info.go
echo "Full context of GPU comparison in node_info.go..."
rg -B 20 -A 20 "cmpopts.SortSlices.*GPU.*Less" pkg/models/node_info.go

Length of output: 2997

pkg/models/node_info.go (1)

129-148: LGTM! Well-implemented comparison logic.

The HasStaticConfigChanged implementation is thorough:

  • Properly ignores dynamic operational fields
  • Handles slice ordering correctly
  • Uses appropriate comparison options
go.mod (2)

24-24: LGTM: Addition of go-cmp package

The addition of github.com/google/go-cmp v0.6.0 is appropriate for implementing deep comparisons, particularly useful for the new functionality mentioned in the AI summary.


Line range hint 186-189: Verify the necessity of package version overrides

The replace directives are forcing specific versions of Google packages. Let's verify if these versions are still required or if we can use newer versions.

pkg/test/utils/watcher.go (2)

1-17: LGTM! Well-structured package and imports.

The package structure follows Go conventions, and all imports are necessary and well-organized.


31-31: Verify test database cleanup.

The BoltDB databases created by watchertest.CreateBoltDB should be properly cleaned up after tests. Let's verify the cleanup implementation:

Also applies to: 50-50, 65-65

✅ Verification successful

Database cleanup is properly implemented

The verification shows that CreateBoltDB in pkg/lib/watcher/test/utils.go uses t.TempDir() to create temporary database files, which are automatically cleaned up by the Go testing framework after each test. This is a safe and recommended approach for test database management.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if CreateBoltDB handles cleanup
ast-grep --pattern 'func CreateBoltDB($params) {
  $$$
}'

Length of output: 59


Script:

#!/bin/bash
# Search for CreateBoltDB function with ripgrep for better context
rg "CreateBoltDB" -A 10

# Also search for any defer cleanup patterns in test files
rg "defer.*Clean|defer.*Close" --type go

Length of output: 10538

pkg/transport/nclprotocol/compute/health_tracker_test.go (2)

1-30: LGTM! Well-structured test suite setup.

The test suite follows Go testing best practices with proper build tags, clear imports, and good organization using testify.


99-123: LGTM! Well-structured state transition tests.

The table-driven approach effectively tests the connection lifecycle, making it easy to add new transition scenarios.

pkg/transport/nclprotocol/compute/config.go (1)

63-69: Add minimum value validations for timing configurations.

The validation should include minimum acceptable values for timing configurations to prevent unreasonably small values that could impact system stability.

pkg/transport/nclprotocol/compute/dataplane.go (2)

147-169: LGTM: Thread-safe implementation of lifecycle methods

The Stop and IsRunning methods are well-implemented with proper mutex usage and state management.


86-92: 🛠️ Refactor suggestion

Fix potential race condition in error handling

The cleanup error is only logged but could affect the caller's error handling logic. Consider joining the cleanup error with the original error.

 defer func() {
 	if err != nil {
 		if cleanupErr := dp.cleanup(ctx); cleanupErr != nil {
-			log.Warn().Err(cleanupErr).Msg("failed to cleanup after start error")
+			err = errors.Join(err, fmt.Errorf("cleanup failed: %w", cleanupErr))
 		}
 	}
 }()

Likely invalid or redundant comment.

pkg/transport/nclprotocol/compute/manager_test.go (2)

1-26: LGTM: Build tags and imports are well-structured

The build tags and imports are properly organized, with appropriate test dependencies included.


267-269: LGTM: Suite runner is properly implemented

The test suite runner follows the standard testify pattern correctly.

pkg/models/node_info.go Outdated Show resolved Hide resolved
pkg/models/node_info.go Outdated Show resolved Hide resolved
pkg/test/utils/watcher.go Outdated Show resolved Hide resolved
pkg/test/utils/watcher.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/compute/dataplane_test.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/test/control_plane.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/test/utils.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/test/nodes.go Outdated Show resolved Hide resolved
pkg/transport/nclprotocol/compute/dataplane.go Outdated Show resolved Hide resolved
wdbaruni and others added 4 commits December 11, 2024 10:28
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (11)
pkg/transport/nclprotocol/test/utils.go (2)

11-12: Enhance documentation for exported type.

The comment should follow godoc format and specify which interface this mock implements.

Apply this diff to improve the documentation:

-// MockLogStreamServer implements a minimal logstream.Server for testing
+// MockLogStreamServer provides a minimal implementation of the logstream.Server interface for testing purposes.
+// It returns empty log streams for all requests.
 type MockLogStreamServer struct{}

14-19: Consider handling context cancellation.

While the implementation correctly avoids returning a nil channel, it doesn't respect context cancellation which could lead to subtle issues in tests.

Consider this improved implementation:

 func (m *MockLogStreamServer) GetLogStream(ctx context.Context, request messages.ExecutionLogsRequest) (
 	<-chan *concurrency.AsyncResult[models.ExecutionLog], error) {
+	if ctx.Err() != nil {
+		return nil, ctx.Err()
+	}
 	ch := make(chan *concurrency.AsyncResult[models.ExecutionLog])
 	close(ch)
 	return ch, nil
 }
pkg/models/utils.go (1)

26-33: LGTM! Consider adding unit tests.

The copyOrZero generic utility function is well-implemented and serves its purpose effectively. It safely handles nil pointers by returning zero values, which helps prevent nil pointer dereferences.

Consider adding unit tests to verify behavior with:

  • nil pointer input
  • non-nil pointer input
  • different types (struct, primitive types, etc.)
pkg/models/node_info.go (1)

129-148: Consider enhancing documentation for ignored fields.

While the implementation correctly identifies and ignores dynamic operational fields, it would be helpful to document why each ignored field is considered dynamic and how frequently these fields are expected to change during normal operation.

Add detailed comments for the ignored fields:

 // Define which fields to ignore in the comparison
 opts := []cmp.Option{
     cmpopts.IgnoreFields(ComputeNodeInfo{},
+        // Queue capacity changes frequently based on job scheduling
         "QueueUsedCapacity",
+        // Available resources fluctuate with running jobs
         "AvailableCapacity",
+        // Execution counts change as jobs start/complete
         "RunningExecutions",
         "EnqueuedExecutions",
     ),
pkg/test/utils/watcher.go (4)

19-21: Add documentation for the TypeString constant.

Consider adding a comment explaining the purpose and usage of this constant in the context of event serialization.

 const (
+	// TypeString is the type identifier used for string event serialization
 	TypeString = "string"
 )

23-33: Add documentation for CreateComputeEventStore.

The function implementation looks good, but it's missing documentation.

+// CreateComputeEventStore creates a new event store for compute-related events using BoltDB.
+// It registers necessary types for execution upserts and execution events.
+// The returned EventStore must be closed by the caller when no longer needed.
 func CreateComputeEventStore(t *testing.T) watcher.EventStore {

35-45: Add documentation for CreateJobEventStore.

The function implementation looks good, but it's missing documentation.

+// CreateJobEventStore creates a new event store for job-related events using BoltDB.
+// It registers necessary types for execution upserts and evaluation events.
+// The returned EventStore must be closed by the caller when no longer needed.
 func CreateJobEventStore(t *testing.T) watcher.EventStore {

61-70: Improve helper function documentation and configuration.

The implementation looks good but could benefit from the following improvements:

  1. Add documentation
  2. Extract bucket names as constants
+const (
+	// DefaultEventsBucket is the default bucket name for storing events
+	DefaultEventsBucket = "events"
+	// DefaultCheckpointsBucket is the default bucket name for storing checkpoints
+	DefaultCheckpointsBucket = "checkpoints"
+)

+// createEventStore creates a new BoltDB-backed event store with the specified serializer.
+// It creates the necessary buckets for events and checkpoints.
+// The returned EventStore must be closed by the caller when no longer needed.
 func createEventStore(t *testing.T, serializer *watcher.JSONSerializer) watcher.EventStore {
 	database := watchertest.CreateBoltDB(t)
 	eventStore, err := boltdb_watcher.NewEventStore(database,
-		boltdb_watcher.WithEventsBucket("events"),
-		boltdb_watcher.WithCheckpointBucket("checkpoints"),
+		boltdb_watcher.WithEventsBucket(DefaultEventsBucket),
+		boltdb_watcher.WithCheckpointBucket(DefaultCheckpointsBucket),
 		boltdb_watcher.WithEventSerializer(serializer),
 	)
 	require.NoError(t, err)
 	return eventStore
 }
pkg/transport/nclprotocol/compute/manager_test.go (3)

46-47: Consider using a mock clock for precise control in tests

In the SetupTest method, you are initializing a real clock with clock.New(). To gain better control over time-dependent operations and eliminate flakiness due to real time delays, consider using a mock clock like clock.NewMock(). This allows you to manually advance the clock in your tests.


121-124: Avoid using time.Sleep in tests to prevent flakiness

In the TestSuccessfulConnection method, using time.Sleep to wait for heartbeats can lead to flaky tests. Instead, consider using synchronization mechanisms like channels or mock clocks to control the timing more reliably.


227-228: Replace time.Sleep with synchronization primitives

In the TestHeartbeatFailure method, the use of time.Sleep with calculated durations can make the test brittle and slow. Utilize synchronization primitives or mock the heartbeat mechanism to simulate missed heartbeats without real time delays.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 8c56ee7 and 3b84def.

📒 Files selected for processing (9)
  • pkg/models/node_info.go (3 hunks)
  • pkg/models/utils.go (1 hunks)
  • pkg/test/utils/watcher.go (1 hunks)
  • pkg/transport/nclprotocol/compute/dataplane.go (1 hunks)
  • pkg/transport/nclprotocol/compute/dataplane_test.go (1 hunks)
  • pkg/transport/nclprotocol/compute/manager_test.go (1 hunks)
  • pkg/transport/nclprotocol/test/control_plane.go (1 hunks)
  • pkg/transport/nclprotocol/test/nodes.go (1 hunks)
  • pkg/transport/nclprotocol/test/utils.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/transport/nclprotocol/test/nodes.go
  • pkg/transport/nclprotocol/compute/dataplane_test.go
  • pkg/transport/nclprotocol/test/control_plane.go
🔇 Additional comments (9)
pkg/transport/nclprotocol/test/utils.go (1)

1-9: LGTM!

The package name and imports are appropriate for a test utility file.

pkg/models/node_info.go (2)

113-127: LGTM! Nil-safe deep copy implementation.

The implementation correctly handles deep copying of all fields and safely handles nil pointers using the new copyOrZero utility function. This addresses the previous concerns about potential nil pointer dereferences.


164-181: LGTM! Nil-safe deep copy implementation.

The implementation correctly handles deep copying of all fields and safely handles nil pointers using the new copyOrZero utility function. This addresses the previous concerns about potential nil pointer dereferences.

pkg/test/utils/watcher.go (1)

1-17: LGTM! Package structure and imports are well organized.

The package name follows Go conventions for test utilities, and all imports are properly utilized.

pkg/transport/nclprotocol/compute/manager_test.go (1)

45-110: Ensure proper cleanup in TearDownTest

While the TearDownTest method handles cleanup, consider adding checks to ensure all resources are closed properly, especially in cases where test initialization fails partway through. This can prevent resource leaks in your test suite.

pkg/transport/nclprotocol/compute/dataplane.go (4)

55-67: Good validation of constructor parameters

The NewDataPlane function properly checks for required parameters such as Client and NodeID. This helps prevent nil pointer dereferences and ensures the data plane is correctly initialized.


92-98: Consider propagating cleanup errors in deferred function

In the Start method, the deferred function logs errors from dp.cleanup(ctx) but does not return them. Propagating these errors can help detect and handle initialization failures more effectively.


183-197: Ensure errors are properly accumulated in cleanup method

In the cleanup method, you're using errors.Join to accumulate errors. Verify that the Go version used supports errors.Join, which was introduced in Go 1.20. If not, consider an alternative method for error aggregation.


172-175: Thread-safe access in IsRunning method

The IsRunning method correctly uses a read lock to safely access the running state. This ensures thread safety when checking if the data plane is active.

Comment on lines +183 to +211
// Configure responder to reject handshake
s.mockResponder.Behaviour().HandshakeResponse.Response = messages.HandshakeResponse{
Accepted: false,
Reason: "node not allowed",
}

err := s.manager.Start(s.ctx)
s.Require().NoError(err)

// Verify disconnected state
s.Require().Eventually(func() bool {
health := s.manager.GetHealth()
return health.CurrentState == nclprotocol.Disconnected &&
health.LastError != nil &&
health.ConsecutiveFailures > 0
}, time.Second, 10*time.Millisecond)

// Allow handshake and verify reconnection
s.mockResponder.Behaviour().HandshakeResponse.Response = messages.HandshakeResponse{
Accepted: true,
}

// Retry handshake
time.Sleep(s.config.ReconnectInterval)
s.Require().Eventually(func() bool {
health := s.manager.GetHealth()
return health.CurrentState == nclprotocol.Connected
}, time.Second, 10*time.Millisecond, "manager should be connected")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential race conditions in concurrent tests

The TestRejectedHandshake method modifies shared behavior of s.mockResponder that could be accessed concurrently. Ensure thread safety when modifying shared test fixtures in parallel tests to prevent race conditions.

Apply this diff to add synchronization:

 func (s *ConnectionManagerTestSuite) TestRejectedHandshake() {
+   s.mockResponder.Lock()
    // Configure responder to reject handshake
    s.mockResponder.Behaviour().HandshakeResponse.Response = messages.HandshakeResponse{
        Accepted: false,
        Reason:   "node not allowed",
    }
+   s.mockResponder.Unlock()

    err := s.manager.Start(s.ctx)
    s.Require().NoError(err)

    // Verify disconnected state
    s.Require().Eventually(func() bool {
        health := s.manager.GetHealth()
        return health.CurrentState == nclprotocol.Disconnected &&
            health.LastError != nil &&
            health.ConsecutiveFailures > 0
    }, time.Second, 10*time.Millisecond)

+   s.mockResponder.Lock()
    // Allow handshake and verify reconnection
    s.mockResponder.Behaviour().HandshakeResponse.Response = messages.HandshakeResponse{
        Accepted: true,
    }
+   s.mockResponder.Unlock()

    // Retry handshake
    time.Sleep(s.config.ReconnectInterval)
    s.Require().Eventually(func() bool {
        health := s.manager.GetHealth()
        return health.CurrentState == nclprotocol.Connected
    }, time.Second, 10*time.Millisecond, "manager should be connected")
}

Committable suggestion skipped: line range outside the PR's diff.

@wdbaruni wdbaruni changed the title nclprotocol transport layer NCL Protocol Dec 11, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
pkg/transport/nclprotocol/README.md (4)

331-333: Clarify LogStreamServer's role and implementation

The LogStreamServer component is mentioned but its functionality in handling job output streaming could be better explained. Consider adding:

  • How job output is collected and buffered
  • Streaming protocol details
  • Error handling mechanisms

375-376: Enhance checkpoint documentation

Consider adding more details about the checkpoint mechanism:

  • How sequence numbers relate to checkpoints
  • What data is included in checkpoints
  • How checkpoints are used during recovery

57-62: Add language specifiers to code blocks

Add language specifiers to the following code blocks for proper syntax highlighting:

  • NATS Subject Structure (use plaintext)
  • Event Flow Architecture (use plaintext)
  • Component Dependencies (use plaintext)
-```
+```plaintext
bacalhau.global.compute.<nodeID>.in.msgs  - Messages to compute node
...

Also applies to: 77-86, 321-339, 343-359

🧰 Tools
🪛 Markdownlint (0.35.0)

57-57: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


319-319: Remove trailing colons from headings

For consistency with markdown style guidelines, remove trailing colons from the following headings:

  • "Compute Node Components:"
  • "Orchestrator Components:"
-### Compute Node Components:
+### Compute Node Components

-### Orchestrator Components:
+### Orchestrator Components

Also applies to: 341-341

🧰 Tools
🪛 Markdownlint (0.35.0)

319-319: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3b84def and 97500e2.

📒 Files selected for processing (1)
  • pkg/transport/nclprotocol/README.md (1 hunks)
🧰 Additional context used
🪛 Markdownlint (0.35.0)
pkg/transport/nclprotocol/README.md

319-319: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)


341-341: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)


57-57: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


321-321: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


343-343: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (2)
pkg/transport/nclprotocol/README.md (2)

5-14: Well-structured documentation with comprehensive coverage!

The document follows a logical progression from concepts to implementation details, making it easy to understand and navigate.


1-383: Excellent protocol documentation!

The documentation provides a comprehensive and well-structured overview of the NCL protocol. While there are some minor improvements suggested above, the overall quality is high with:

  • Clear architectural explanations
  • Detailed message contracts
  • Well-illustrated communication flows
  • Comprehensive configuration options
🧰 Tools
🪛 Markdownlint (0.35.0)

319-319: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)


341-341: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)


57-57: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


321-321: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)


343-343: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)

@wdbaruni wdbaruni merged commit 83bc0a4 into main Dec 11, 2024
14 checks passed
@wdbaruni wdbaruni deleted the ncl-transport branch December 11, 2024 10:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant