Skip to content

Commit

Permalink
new flag, no status check
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Dec 13, 2024
1 parent a01b39c commit f6a041e
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 3 deletions.
15 changes: 14 additions & 1 deletion cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ VERSION:
Name: "start-swagger-ui",
Usage: "If set to true, will start a Swagger UI on the root",
}
// noStatusCheck defines a flag that specifies if the status checks for the observers should be skipped
noStatusCheck = cli.BoolFlag{
Name: "no-status-check",
Usage: "If set to true, will skip the status check for observers, treating them as always synced. ⚠️ This relies on proper " +
"observers management on the provider side.",
}

testServer *testing.TestHttpServer
)
Expand All @@ -184,6 +190,7 @@ func main() {
workingDirectory,
memBallast,
startSwaggerUI,
noStatusCheck,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -273,7 +280,8 @@ func startProxy(ctx *cli.Context) error {
statusMetricsProvider := metrics.NewStatusMetrics()

shouldStartSwaggerUI := ctx.GlobalBool(startSwaggerUI.Name)
versionsRegistry, err := createVersionsRegistryTestOrProduction(ctx, generalConfig, configurationFileName, statusMetricsProvider, closableComponents)
skipStatusCheck := ctx.GlobalBool(noStatusCheck.Name)
versionsRegistry, err := createVersionsRegistryTestOrProduction(ctx, generalConfig, configurationFileName, statusMetricsProvider, closableComponents, skipStatusCheck)
if err != nil {
return err
}
Expand Down Expand Up @@ -309,6 +317,7 @@ func createVersionsRegistryTestOrProduction(
configurationFilePath string,
statusMetricsHandler data.StatusMetricsProvider,
closableComponents *data.ClosableComponentsHandler,
skipStatusCheck bool,
) (data.VersionsRegistryHandler, error) {

var testHTTPServerEnabled bool
Expand Down Expand Up @@ -373,6 +382,7 @@ func createVersionsRegistryTestOrProduction(
ctx.GlobalString(walletKeyPemFile.Name),
ctx.GlobalString(apiConfigDirectory.Name),
closableComponents,
skipStatusCheck,
)
}

Expand All @@ -383,6 +393,7 @@ func createVersionsRegistryTestOrProduction(
ctx.GlobalString(walletKeyPemFile.Name),
ctx.GlobalString(apiConfigDirectory.Name),
closableComponents,
skipStatusCheck,
)
}

Expand All @@ -393,6 +404,7 @@ func createVersionsRegistry(
pemFileLocation string,
apiConfigDirectoryPath string,
closableComponents *data.ClosableComponentsHandler,
skipStatusCheck bool,
) (data.VersionsRegistryHandler, error) {
pubKeyConverter, err := pubkeyConverter.NewBech32PubkeyConverter(cfg.AddressPubkeyConverter.Length, addressHRP)
if err != nil {
Expand Down Expand Up @@ -441,6 +453,7 @@ func createVersionsRegistry(
observersProvider,
fullHistoryNodesProvider,
pubKeyConverter,
skipStatusCheck,
)
if err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func (bnp *baseNodeProvider) UpdateNodesBasedOnSyncState(nodesWithSyncStatus []*
bnp.snapshotlessNodes.UpdateNodes(snapshotlessNodes)
}

// PrintNodesInShards will only print the nodes in shards
func (bnp *baseNodeProvider) PrintNodesInShards() {
bnp.mutNodes.RLock()
defer bnp.mutNodes.RUnlock()

bnp.regularNodes.PrintNodesInShards()
bnp.snapshotlessNodes.PrintNodesInShards()
}

func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []*data.NodeData) {
regularNodes := make([]*data.NodeData, 0)
snapshotlessNodes := make([]*data.NodeData, 0)
Expand Down
4 changes: 4 additions & 0 deletions observer/disabledNodesProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (d *disabledNodesProvider) ReloadNodes(_ data.NodeType) data.NodesReloadRes
return data.NodesReloadResponse{Description: "disabled nodes provider", Error: d.returnMessage}
}

// PrintNodesInShards does nothing as it is disabled
func (d *disabledNodesProvider) PrintNodesInShards() {
}

// IsInterfaceNil returns true if there is no value under the interface
func (d *disabledNodesProvider) IsInterfaceNil() bool {
return d == nil
Expand Down
8 changes: 8 additions & 0 deletions observer/holder/nodesHolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func (nh *nodesHolder) UpdateNodes(nodesWithSyncStatus []*data.NodeData) {
nh.printNodesInShardsUnprotected()
}

// PrintNodesInShards will only print the nodes in shards
func (nh *nodesHolder) PrintNodesInShards() {
nh.mut.RLock()
defer nh.mut.RUnlock()

nh.printNodesInShardsUnprotected()
}

// GetSyncedNodes returns all the synced nodes
func (nh *nodesHolder) GetSyncedNodes(shardID uint32) []*data.NodeData {
return nh.getObservers(syncedNodesCache, shardID)
Expand Down
2 changes: 2 additions & 0 deletions observer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ type NodesProviderHandler interface {
UpdateNodesBasedOnSyncState(nodesWithSyncStatus []*data.NodeData)
GetAllNodesWithSyncState() []*data.NodeData
ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse
PrintNodesInShards()
IsInterfaceNil() bool
}

// NodesHolder defines the actions of a component that is able to hold nodes
type NodesHolder interface {
UpdateNodes(nodesWithSyncStatus []*data.NodeData)
PrintNodesInShards()
GetSyncedNodes(shardID uint32) []*data.NodeData
GetSyncedFallbackNodes(shardID uint32) []*data.NodeData
GetOutOfSyncNodes(shardID uint32) []*data.NodeData
Expand Down
23 changes: 21 additions & 2 deletions process/baseProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type BaseProcessor struct {
chanTriggerNodesState chan struct{}
delayForCheckingNodesSyncState time.Duration
cancelFunc func()
noStatusCheck bool

httpClient *http.Client
}
Expand All @@ -53,6 +54,7 @@ func NewBaseProcessor(
observersProvider observer.NodesProviderHandler,
fullHistoryNodesProvider observer.NodesProviderHandler,
pubKeyConverter core.PubkeyConverter,
noStatusCheck bool,
) (*BaseProcessor, error) {
if check.IfNil(shardCoord) {
return nil, ErrNilShardCoordinator
Expand Down Expand Up @@ -84,9 +86,14 @@ func NewBaseProcessor(
shardIDs: computeShardIDs(shardCoord),
delayForCheckingNodesSyncState: stepDelayForCheckingNodesSyncState,
chanTriggerNodesState: make(chan struct{}),
noStatusCheck: noStatusCheck,
}
bp.nodeStatusFetcher = bp.getNodeStatusResponseFromAPI

if noStatusCheck {
log.Info("Proxy started with no status check! The provided observers will always be considered synced!")
}

return bp, nil
}

Expand Down Expand Up @@ -347,7 +354,7 @@ func (bp *BaseProcessor) handleOutOfSyncNodes(ctx context.Context) {
timer := time.NewTimer(bp.delayForCheckingNodesSyncState)
defer timer.Stop()

bp.updateNodesWithSync()
bp.handleNodes()
for {
timer.Reset(bp.delayForCheckingNodesSyncState)

Expand All @@ -359,10 +366,22 @@ func (bp *BaseProcessor) handleOutOfSyncNodes(ctx context.Context) {
return
}

bp.updateNodesWithSync()
bp.handleNodes()
}
}

func (bp *BaseProcessor) handleNodes() {
// if proxy is started with no-status-check flag, only print the observers.
// they are already initialized by default as synced.
if bp.noStatusCheck {
bp.observersProvider.PrintNodesInShards()
bp.fullHistoryNodesProvider.PrintNodesInShards()
return
}

bp.updateNodesWithSync()
}

func (bp *BaseProcessor) updateNodesWithSync() {
observers := bp.observersProvider.GetAllNodesWithSyncState()
observersWithSyncStatus := bp.getNodesWithSyncStatus(observers)
Expand Down
Loading

0 comments on commit f6a041e

Please sign in to comment.