Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create MultiNode Adaptor #7

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open

Conversation

DylanTinianov
Copy link
Contributor

@DylanTinianov DylanTinianov commented Jan 10, 2025

Description

Ticket: https://smartcontract-it.atlassian.net/browse/BCFR-1071

Most of the code in the Solana MultiNodeClient can be reused by all chain integrations. This PR extracts all of the chain-agnostic client code through making a generic MultiNodeAdaptor. This will further extract functionality into the MultiNode and reduce new integration cost.

Generic MultiNodeAdaptor:

type MultiNodeAdaptor[RPC any, HEAD Head] struct {
	cfg         *mnCfg.MultiNodeConfig
	rpc         *RPC
	stateMu     sync.RWMutex // protects state* fields
	subsSliceMu sync.RWMutex
	subs        map[Subscription]struct{}
    chStopInFlight chan struct{}
	chainInfoLock sync.RWMutex
	highestUserObservations ChainInfo
	latestChainInfo ChainInfo
	
	// chain-specific functions will be passed in
	latestBlock          func(ctx context.Context, rpc *RPC) (HEAD, error)
	latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error)
}

The the following methods can all be chain-agnostic and extracted:

RegisterSub(sub Subscription, stopInFLightCh chan struct{}) 
LatestBlock(ctx context.Context) (HEAD, error) 
LatestFinalizedBlock(ctx context.Context) (HEAD, error) 
SubscribeToHeads
SubscribeToFinalizedHeads
OnNewHead
OnNewFinalizedHead
MakeQueryCtx
AcquireQueryCtx
UnsubscribeAllExcept
CancelInflightRequests
Close
GetInterceptedChainInfo

All other client methods will still be implemented on the chain specific side including:

Dial(ctx context.Context) error 
IsSyncing()
Ping(ctx context.Context) 
SendTxResult

@DylanTinianov DylanTinianov self-assigned this Jan 10, 2025
@DylanTinianov DylanTinianov marked this pull request as ready for review January 10, 2025 21:06
@DylanTinianov DylanTinianov requested a review from a team as a code owner January 10, 2025 21:06
@DylanTinianov DylanTinianov force-pushed the BCFR-1071-multinode-adaptor branch from 988f9b2 to cfbc9d2 Compare January 10, 2025 22:31
}

// Adapter is used to integrate multinode into chain-specific clients
type Adapter[RPC any, HEAD Head] struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice also to provide a standardized method for measuring latency (including prom metrics)

// chStopInFlight can be closed to immediately cancel all in-flight requests on
// this RpcMultiNodeAdapter. Closing and replacing should be serialized through
// stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close.
chStopInFlight chan struct{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to a lifeCycleCh to signal that it's closed once RPC is declared unhealthy?


// GetChStopInflight provides a convenience helper that mutex wraps a
// read to the chStopInFlight
func (m *Adapter[RPC, HEAD]) GetChStopInflight() chan struct{} {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need all GetChStopInflight, MakeQueryCtx and AcquireQueryCtx methods?

return m.chStopInFlight
}

func (m *Adapter[RPC, HEAD]) ResetLatestChainInfo() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this public?

}

func (m *Adapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
m.chainInfoLock.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be RLock?

c := newTestClient(t)
head, err := c.LatestBlock(tests.Context(t))
require.NoError(t, err)
require.True(t, head.IsValid())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test that chain info is updated

return c
}

func TestMultiNodeClient_LatestBlock(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It woulbe be nice to add more coverage to verify that OnNewHead and OnNewFinalizedHead properly treats HealthCheckRequests and respects closure of requestCh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants