Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NCL Protocol #4734

Merged
merged 17 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/compute/watchers/ncl_message_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/transport"
"github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol"
)

type NCLMessageCreator struct {
Expand Down Expand Up @@ -67,4 +67,4 @@ func (d *NCLMessageCreator) CreateMessage(event watcher.Event) (*envelope.Messag
}

// compile-time check that NCLMessageCreator implements dispatcher.MessageCreator
var _ transport.MessageCreator = &NCLMessageCreator{}
var _ nclprotocol.MessageCreator = &NCLMessageCreator{}
1 change: 1 addition & 0 deletions pkg/config/types/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Heartbeat struct {
// InfoUpdateInterval specifies the time between updates of non-resource information to the orchestrator.
InfoUpdateInterval Duration `yaml:"InfoUpdateInterval,omitempty" json:"InfoUpdateInterval,omitempty"`
// ResourceUpdateInterval specifies the time between updates of resource information to the orchestrator.
// Deprecated: only used by legacy transport, will be removed in the future.
ResourceUpdateInterval Duration `yaml:"ResourceUpdateInterval,omitempty" json:"ResourceUpdateInterval,omitempty"`
// Interval specifies the time between heartbeat signals sent to the orchestrator.
Interval Duration `yaml:"Interval,omitempty" json:"Interval,omitempty"`
Expand Down
8 changes: 8 additions & 0 deletions pkg/models/messages/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ const (
BidResultMessageType = "BidResult"
RunResultMessageType = "RunResult"
ComputeErrorMessageType = "ComputeError"

HandshakeRequestMessageType = "transport.HandshakeRequest"
HeartbeatRequestMessageType = "transport.HeartbeatRequest"
NodeInfoUpdateRequestMessageType = "transport.UpdateNodeInfoRequest"

HandshakeResponseType = "transport.HandshakeResponse"
HeartbeatResponseType = "transport.HeartbeatResponse"
NodeInfoUpdateResponseType = "transport.UpdateNodeInfoResponse"
)
6 changes: 6 additions & 0 deletions pkg/models/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ func (n NodeInfo) IsComputeNode() bool {
return n.NodeType == NodeTypeCompute
}

// HasNodeInfoChanged returns true if the node info has changed compared to the previous call
// TODO: implement this function
func HasNodeInfoChanged(prev, current NodeInfo) bool {
return false
}
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved

// ComputeNodeInfo contains metadata about the current state and abilities of a compute node. Compute Nodes share
// this state with Requester nodes by including it in the NodeInfo they share across the network.
type ComputeNodeInfo struct {
Expand Down
97 changes: 30 additions & 67 deletions pkg/node/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/watchers"
"github.com/bacalhau-project/bacalhau/pkg/executor"
executor_util "github.com/bacalhau-project/bacalhau/pkg/executor/util"
"github.com/bacalhau-project/bacalhau/pkg/lib/ncl"
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/nats"
Expand All @@ -29,14 +28,14 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/publisher"
"github.com/bacalhau-project/bacalhau/pkg/storage"
bprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/transport/dispatcher"
transportcompute "github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/dispatcher"
)

type Compute struct {
// Visible for testing
ID string
LocalEndpoint compute.Endpoint
LogstreamServer logstream.Server
Capacity capacity.Tracker
ExecutionStore store.ExecutionStore
Executors executor.ExecProvider
Expand Down Expand Up @@ -139,12 +138,6 @@ func NewComputeNode(
},
})

// logging server
logserver := logstream.NewServer(logstream.ServerParams{
ExecutionStore: executionStore,
Executors: executors,
})

bidder := NewBidder(cfg,
allocatedResources,
publishers,
Expand Down Expand Up @@ -204,54 +197,46 @@ func NewComputeNode(
err = nil
}

// compute -> orchestrator ncl publisher
natsConn, err := clientFactory.CreateClient(ctx)
if err != nil {
return nil, err
}
messageRegistry := MustCreateMessageRegistry()
nclPublisher, err := ncl.NewOrderedPublisher(natsConn, ncl.OrderedPublisherConfig{
Name: cfg.NodeID,
Destination: computeOutSubject(cfg.NodeID),
MessageRegistry: messageRegistry,
// connection manager
connectionManager, err := transportcompute.NewConnectionManager(transportcompute.Config{
NodeID: cfg.NodeID,
ClientFactory: clientFactory,
NodeInfoProvider: nodeInfoProvider,
HeartbeatInterval: cfg.BacalhauConfig.Compute.Heartbeat.Interval.AsTimeDuration(),
NodeInfoUpdateInterval: cfg.BacalhauConfig.Compute.Heartbeat.InfoUpdateInterval.AsTimeDuration(),
DataPlaneMessageHandler: compute.NewMessageHandler(executionStore),
DataPlaneMessageCreator: watchers.NewNCLMessageCreator(),
EventStore: executionStore.GetEventStore(),
Checkpointer: executionStore,
DispatcherConfig: dispatcher.DefaultConfig(),
LogStreamServer: logstream.NewServer(logstream.ServerParams{
ExecutionStore: executionStore,
Executors: executors,
}),
})
if err != nil {
return nil, err
}

// orchestrator -> compute ncl subscriber
nclSubscriber, err := ncl.NewSubscriber(natsConn, ncl.SubscriberConfig{
Name: cfg.NodeID,
MessageRegistry: messageRegistry,
MessageHandler: compute.NewMessageHandler(executionStore),
})
if err != nil {
return nil, err
}
if err = nclSubscriber.Subscribe(ctx, computeInSubscription(cfg.NodeID)); err != nil {
return nil, err
if err = connectionManager.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start connection manager: %w", err)
}

watcherRegistry, nclDispatcher, err := setupComputeWatchers(
ctx, executionStore, nclPublisher, bufferRunner, bidder)
watcherRegistry, err := setupComputeWatchers(
ctx, executionStore, bufferRunner, bidder)
if err != nil {
return nil, err
}

// A single Cleanup function to make sure the order of closing dependencies is correct
cleanupFunc := func(ctx context.Context) {
if err = nclSubscriber.Close(ctx); err != nil {
log.Error().Err(err).Msg("failed to close ncl subscriber")
}
if nclDispatcher != nil {
if err = nclDispatcher.Stop(ctx); err != nil {
log.Error().Err(err).Msg("failed to stop dispatcher")
}
}
if err = watcherRegistry.Stop(ctx); err != nil {
log.Error().Err(err).Msg("failed to stop watcher registry")
}
legacyConnectionManager.Stop(ctx)
if err = connectionManager.Close(ctx); err != nil {
log.Error().Err(err).Msg("failed to stop connection manager")
}
if err = executionStore.Close(ctx); err != nil {
log.Error().Err(err).Msg("failed to close execution store")
}
Expand All @@ -263,7 +248,6 @@ func NewComputeNode(
return &Compute{
ID: cfg.NodeID,
LocalEndpoint: baseEndpoint,
LogstreamServer: logserver,
Capacity: runningCapacityTracker,
ExecutionStore: executionStore,
Executors: executors,
Expand Down Expand Up @@ -352,19 +336,19 @@ func NewBidder(
func setupComputeWatchers(
ctx context.Context,
executionStore store.ExecutionStore,
nclPublisher ncl.OrderedPublisher,
bufferRunner *compute.ExecutorBuffer,
bidder compute.Bidder,
) (watcher.Manager, *dispatcher.Dispatcher, error) {
) (watcher.Manager, error) {
watcherRegistry := watcher.NewManager(executionStore.GetEventStore())

// Set up execution logger watcher
_, err := watcherRegistry.Create(ctx, computeExecutionLoggerWatcherID,
watcher.WithHandler(watchers.NewExecutionLogger(log.Logger)),
watcher.WithEphemeral(),
watcher.WithAutoStart(),
watcher.WithInitialEventIterator(watcher.LatestIterator()))
if err != nil {
return nil, nil, fmt.Errorf("failed to setup execution logger watcher: %w", err)
return nil, fmt.Errorf("failed to setup execution logger watcher: %w", err)
}

// Set up execution handler watcher
Expand All @@ -378,29 +362,8 @@ func setupComputeWatchers(
watcher.WithMaxRetries(3),
watcher.WithInitialEventIterator(watcher.LatestIterator()))
if err != nil {
return nil, nil, fmt.Errorf("failed to setup execution handler watcher: %w", err)
}

// setup ncl dispatcher
nclDispatcherWatcher, err := watcherRegistry.Create(ctx, computeNCLDispatcherWatcherID,
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{compute.EventObjectExecutionUpsert},
}),
watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
watcher.WithInitialEventIterator(watcher.LatestIterator()))
if err != nil {
return nil, nil, fmt.Errorf("failed to setup ncl dispatcher watcher: %w", err)
}

nclDispatcher, err := dispatcher.New(
nclPublisher, nclDispatcherWatcher, watchers.NewNCLMessageCreator(), dispatcher.DefaultConfig())
if err != nil {
return nil, nil, fmt.Errorf("failed to create dispatcher: %w", err)
}

if err = nclDispatcher.Start(ctx); err != nil {
return nil, nil, fmt.Errorf("failed to start dispatcher: %w", err)
return nil, fmt.Errorf("failed to setup execution handler watcher: %w", err)
}

return watcherRegistry, nclDispatcher, nil
return watcherRegistry, nil
}
8 changes: 0 additions & 8 deletions pkg/node/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,10 @@ const (
// and handles them locally by triggering the executor or bidder for example.
computeExecutionHandlerWatcherID = "execution-handler"

// computeNCLDispatcherWatcherID is the ID of the watcher that listens for execution events
// and forwards them to the NCL dispatcher.
computeNCLDispatcherWatcherID = "compute-ncl-dispatcher"

// computeExecutionLoggerWatcherID is the ID of the watcher that listens for execution events
// and logs them.
computeExecutionLoggerWatcherID = "compute-logger"

// orchestratorNCLDispatcherWatcherID is the ID of the watcher that listens for execution events
// and forwards them to the NCL dispatcher.
orchestratorNCLDispatcherWatcherID = "orchestrator-ncl-dispatcher"

// orchestratorExecutionCancellerWatcherID is the ID of the watcher that listens for execution events
// and cancels them the execution's observed state
orchestratorExecutionCancellerWatcherID = "execution-canceller"
Expand Down
58 changes: 0 additions & 58 deletions pkg/node/ncl.go

This file was deleted.

Loading
Loading