Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NCL Protocol #4734

Merged
merged 17 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/compute/watchers/ncl_message_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/transport"
"github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol"
)

type NCLMessageCreator struct {
Expand Down Expand Up @@ -67,4 +67,4 @@ func (d *NCLMessageCreator) CreateMessage(event watcher.Event) (*envelope.Messag
}

// compile-time check that NCLMessageCreator implements dispatcher.MessageCreator
var _ transport.MessageCreator = &NCLMessageCreator{}
var _ nclprotocol.MessageCreator = &NCLMessageCreator{}
1 change: 1 addition & 0 deletions pkg/config/types/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Heartbeat struct {
// InfoUpdateInterval specifies the time between updates of non-resource information to the orchestrator.
InfoUpdateInterval Duration `yaml:"InfoUpdateInterval,omitempty" json:"InfoUpdateInterval,omitempty"`
// ResourceUpdateInterval specifies the time between updates of resource information to the orchestrator.
// Deprecated: only used by legacy transport, will be removed in the future.
ResourceUpdateInterval Duration `yaml:"ResourceUpdateInterval,omitempty" json:"ResourceUpdateInterval,omitempty"`
// Interval specifies the time between heartbeat signals sent to the orchestrator.
Interval Duration `yaml:"Interval,omitempty" json:"Interval,omitempty"`
Expand Down
8 changes: 6 additions & 2 deletions pkg/lib/validate/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ func NotNil(value any, msg string, args ...any) error {

// Use reflection to handle cases where value is a nil pointer wrapped in an interface
val := reflect.ValueOf(value)
if val.Kind() == reflect.Ptr && val.IsNil() {
return createError(msg, args...)
switch val.Kind() {
case reflect.Ptr, reflect.Interface, reflect.Map, reflect.Slice, reflect.Func:
if val.IsNil() {
return createError(msg, args...)
}
default:
}
return nil
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/lib/validate/general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ package validate

import "testing"

type doer struct{}

func (d doer) Do() {}

type Doer interface {
Do()
}

// TestIsNotNil tests the NotNil function for various scenarios.
func TestIsNotNil(t *testing.T) {
t.Run("NilValue", func(t *testing.T) {
Expand Down Expand Up @@ -35,4 +43,78 @@ func TestIsNotNil(t *testing.T) {
t.Errorf("NotNil failed: unexpected error for non-nil pointer")
}
})

t.Run("NilFunc", func(t *testing.T) {
var nilFunc func()
err := NotNil(nilFunc, "value should not be nil")
if err == nil {
t.Errorf("NotNil failed: expected error for nil func")
}
})

t.Run("NonNilFunc", func(t *testing.T) {
nonNilFunc := func() {}
err := NotNil(nonNilFunc, "value should not be nil")
if err != nil {
t.Errorf("NotNil failed: unexpected error for non-nil func")
}
})

t.Run("NilSlice", func(t *testing.T) {
var nilSlice []int
err := NotNil(nilSlice, "value should not be nil")
if err == nil {
t.Errorf("NotNil failed: expected error for nil slice")
}
})

t.Run("NonNilSlice", func(t *testing.T) {
nonNilSlice := make([]int, 0)
err := NotNil(nonNilSlice, "value should not be nil")
if err != nil {
t.Errorf("NotNil failed: unexpected error for non-nil slice")
}
})

t.Run("NilMap", func(t *testing.T) {
var nilMap map[string]int
err := NotNil(nilMap, "value should not be nil")
if err == nil {
t.Errorf("NotNil failed: expected error for nil map")
}
})

t.Run("NonNilMap", func(t *testing.T) {
nonNilMap := make(map[string]int)
err := NotNil(nonNilMap, "value should not be nil")
if err != nil {
t.Errorf("NotNil failed: unexpected error for non-nil map")
}
})

t.Run("NilInterface", func(t *testing.T) {
var nilInterface Doer
err := NotNil(nilInterface, "value should not be nil")
if err == nil {
t.Errorf("NotNil failed: expected error for nil interface")
}
})

t.Run("NonNilInterface", func(t *testing.T) {
var nonNilInterface Doer = doer{}
err := NotNil(nonNilInterface, "value should not be nil")
if err != nil {
t.Errorf("NotNil failed: unexpected error for non-nil interface")
}
})

t.Run("FormattedMessage", func(t *testing.T) {
err := NotNil(nil, "value %s should not be nil", "test")
if err == nil {
t.Errorf("NotNil failed: expected error for nil value with formatted message")
}
if err.Error() != "value test should not be nil" {
t.Errorf("NotNil failed: unexpected error message, got %q", err.Error())
}
})
}
8 changes: 8 additions & 0 deletions pkg/models/messages/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ const (
BidResultMessageType = "BidResult"
RunResultMessageType = "RunResult"
ComputeErrorMessageType = "ComputeError"

HandshakeRequestMessageType = "transport.HandshakeRequest"
HeartbeatRequestMessageType = "transport.HeartbeatRequest"
NodeInfoUpdateRequestMessageType = "transport.UpdateNodeInfoRequest"

HandshakeResponseType = "transport.HandshakeResponse"
HeartbeatResponseType = "transport.HeartbeatResponse"
NodeInfoUpdateResponseType = "transport.UpdateNodeInfoResponse"
)
6 changes: 6 additions & 0 deletions pkg/models/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ func (n NodeInfo) IsComputeNode() bool {
return n.NodeType == NodeTypeCompute
}

// HasNodeInfoChanged returns true if the node info has changed compared to the previous call
// TODO: implement this function
func HasNodeInfoChanged(prev, current NodeInfo) bool {
return false
}
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved

// ComputeNodeInfo contains metadata about the current state and abilities of a compute node. Compute Nodes share
// this state with Requester nodes by including it in the NodeInfo they share across the network.
type ComputeNodeInfo struct {
Expand Down
25 changes: 1 addition & 24 deletions pkg/models/protocol.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package models

import (
"os"
)

type Protocol string

const (
Expand All @@ -14,22 +10,13 @@ const (
// ProtocolBProtocolV2 is nats based request/response protocol.
// Currently the default protocol while NCL is under development.
ProtocolBProtocolV2 Protocol = "bprotocol/v2"

// EnvPreferNCL is the environment variable to prefer NCL protocol usage.
// This can be used to test NCL protocol while it's still in development.
EnvPreferNCL = "BACALHAU_PREFER_NCL_PROTOCOL"
)

var (
// preferredProtocols is the order of protocols based on preference.
// NOTE: While NCL protocol (ProtocolNCLV1) is under active development,
// we maintain ProtocolBProtocolV2 as the default choice for stability.
// NCL can be enabled via BACALHAU_PREFER_NCL_PROTOCOL env var for testing
// and development purposes. Once NCL reaches stable status, it will become
// the default protocol.
preferredProtocols = []Protocol{
ProtocolBProtocolV2,
ProtocolNCLV1,
ProtocolBProtocolV2,
}
)

Expand All @@ -41,16 +28,6 @@ func (p Protocol) String() string {
// GetPreferredProtocol accepts a slice of available protocols and returns the
// preferred protocol based on the order of preference along with any error
func GetPreferredProtocol(availableProtocols []Protocol) Protocol {
// Check if NCL is preferred via environment variable
if os.Getenv(EnvPreferNCL) == "true" {
// If NCL is available when preferred, use it
for _, p := range availableProtocols {
if p == ProtocolNCLV1 {
return ProtocolNCLV1
}
}
}

for _, preferred := range preferredProtocols {
for _, available := range availableProtocols {
if preferred == available {
Expand Down
98 changes: 30 additions & 68 deletions pkg/node/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/watchers"
"github.com/bacalhau-project/bacalhau/pkg/executor"
executor_util "github.com/bacalhau-project/bacalhau/pkg/executor/util"
"github.com/bacalhau-project/bacalhau/pkg/lib/ncl"
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/nats"
Expand All @@ -29,14 +28,14 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/publisher"
"github.com/bacalhau-project/bacalhau/pkg/storage"
bprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/transport/dispatcher"
transportcompute "github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/dispatcher"
)

type Compute struct {
// Visible for testing
ID string
LocalEndpoint compute.Endpoint
LogstreamServer logstream.Server
Capacity capacity.Tracker
ExecutionStore store.ExecutionStore
Executors executor.ExecProvider
Expand Down Expand Up @@ -139,12 +138,6 @@ func NewComputeNode(
},
})

// logging server
logserver := logstream.NewServer(logstream.ServerParams{
ExecutionStore: executionStore,
Executors: executors,
})

bidder := NewBidder(cfg,
allocatedResources,
publishers,
Expand Down Expand Up @@ -201,57 +194,48 @@ func NewComputeNode(
}
if err = legacyConnectionManager.Start(ctx); err != nil {
log.Warn().Err(err).Msg("failed to start legacy connection manager. continuing without it")
err = nil
}

// compute -> orchestrator ncl publisher
natsConn, err := clientFactory.CreateClient(ctx)
if err != nil {
return nil, err
}
messageRegistry := MustCreateMessageRegistry()
nclPublisher, err := ncl.NewOrderedPublisher(natsConn, ncl.OrderedPublisherConfig{
Name: cfg.NodeID,
Destination: computeOutSubject(cfg.NodeID),
MessageRegistry: messageRegistry,
// connection manager
connectionManager, err := transportcompute.NewConnectionManager(transportcompute.Config{
NodeID: cfg.NodeID,
ClientFactory: clientFactory,
NodeInfoProvider: nodeInfoProvider,
HeartbeatInterval: cfg.BacalhauConfig.Compute.Heartbeat.Interval.AsTimeDuration(),
NodeInfoUpdateInterval: cfg.BacalhauConfig.Compute.Heartbeat.InfoUpdateInterval.AsTimeDuration(),
DataPlaneMessageHandler: compute.NewMessageHandler(executionStore),
DataPlaneMessageCreator: watchers.NewNCLMessageCreator(),
EventStore: executionStore.GetEventStore(),
Checkpointer: executionStore,
DispatcherConfig: dispatcher.DefaultConfig(),
LogStreamServer: logstream.NewServer(logstream.ServerParams{
ExecutionStore: executionStore,
Executors: executors,
}),
})
if err != nil {
return nil, err
}

// orchestrator -> compute ncl subscriber
nclSubscriber, err := ncl.NewSubscriber(natsConn, ncl.SubscriberConfig{
Name: cfg.NodeID,
MessageRegistry: messageRegistry,
MessageHandler: compute.NewMessageHandler(executionStore),
})
if err != nil {
return nil, err
}
if err = nclSubscriber.Subscribe(ctx, computeInSubscription(cfg.NodeID)); err != nil {
return nil, err
if err = connectionManager.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start connection manager: %w", err)
}

watcherRegistry, nclDispatcher, err := setupComputeWatchers(
ctx, executionStore, nclPublisher, bufferRunner, bidder)
watcherRegistry, err := setupComputeWatchers(
ctx, executionStore, bufferRunner, bidder)
if err != nil {
return nil, err
}

// A single Cleanup function to make sure the order of closing dependencies is correct
cleanupFunc := func(ctx context.Context) {
if err = nclSubscriber.Close(ctx); err != nil {
log.Error().Err(err).Msg("failed to close ncl subscriber")
}
if nclDispatcher != nil {
if err = nclDispatcher.Stop(ctx); err != nil {
log.Error().Err(err).Msg("failed to stop dispatcher")
}
}
if err = watcherRegistry.Stop(ctx); err != nil {
log.Error().Err(err).Msg("failed to stop watcher registry")
}
legacyConnectionManager.Stop(ctx)
if err = connectionManager.Close(ctx); err != nil {
log.Error().Err(err).Msg("failed to stop connection manager")
}
if err = executionStore.Close(ctx); err != nil {
log.Error().Err(err).Msg("failed to close execution store")
}
Expand All @@ -263,7 +247,6 @@ func NewComputeNode(
return &Compute{
ID: cfg.NodeID,
LocalEndpoint: baseEndpoint,
LogstreamServer: logserver,
Capacity: runningCapacityTracker,
ExecutionStore: executionStore,
Executors: executors,
Expand Down Expand Up @@ -352,19 +335,19 @@ func NewBidder(
func setupComputeWatchers(
ctx context.Context,
executionStore store.ExecutionStore,
nclPublisher ncl.OrderedPublisher,
bufferRunner *compute.ExecutorBuffer,
bidder compute.Bidder,
) (watcher.Manager, *dispatcher.Dispatcher, error) {
) (watcher.Manager, error) {
watcherRegistry := watcher.NewManager(executionStore.GetEventStore())

// Set up execution logger watcher
_, err := watcherRegistry.Create(ctx, computeExecutionLoggerWatcherID,
watcher.WithHandler(watchers.NewExecutionLogger(log.Logger)),
watcher.WithEphemeral(),
watcher.WithAutoStart(),
watcher.WithInitialEventIterator(watcher.LatestIterator()))
if err != nil {
return nil, nil, fmt.Errorf("failed to setup execution logger watcher: %w", err)
return nil, fmt.Errorf("failed to setup execution logger watcher: %w", err)
}

// Set up execution handler watcher
Expand All @@ -378,29 +361,8 @@ func setupComputeWatchers(
watcher.WithMaxRetries(3),
watcher.WithInitialEventIterator(watcher.LatestIterator()))
if err != nil {
return nil, nil, fmt.Errorf("failed to setup execution handler watcher: %w", err)
}

// setup ncl dispatcher
nclDispatcherWatcher, err := watcherRegistry.Create(ctx, computeNCLDispatcherWatcherID,
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{compute.EventObjectExecutionUpsert},
}),
watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
watcher.WithInitialEventIterator(watcher.LatestIterator()))
if err != nil {
return nil, nil, fmt.Errorf("failed to setup ncl dispatcher watcher: %w", err)
}

nclDispatcher, err := dispatcher.New(
nclPublisher, nclDispatcherWatcher, watchers.NewNCLMessageCreator(), dispatcher.DefaultConfig())
if err != nil {
return nil, nil, fmt.Errorf("failed to create dispatcher: %w", err)
}

if err = nclDispatcher.Start(ctx); err != nil {
return nil, nil, fmt.Errorf("failed to start dispatcher: %w", err)
return nil, fmt.Errorf("failed to setup execution handler watcher: %w", err)
}

return watcherRegistry, nclDispatcher, nil
return watcherRegistry, nil
}
Loading
Loading