Skip to content

Commit

Permalink
Merge pull request #1126 from GeorgeTsagk/startup-fixes
Browse files Browse the repository at this point in the history
Startup related fixes
  • Loading branch information
Roasbeef authored Oct 1, 2024
2 parents c5a4b7a + 577cbad commit 65539b6
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
9 changes: 9 additions & 0 deletions rpcperms/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInte
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {

if info.Server == nil {
return nil, fmt.Errorf("cannot handle call, server " +
"not ready")
}

if err := r.checkRPCState(info.Server); err != nil {
return nil, err
}
Expand All @@ -427,6 +432,10 @@ func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerIn
return func(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

if srv == nil {
return fmt.Errorf("srv is nil, can't check RPC state")
}

if err := r.checkRPCState(srv); err != nil {
return err
}
Expand Down
44 changes: 39 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func (s *Server) UpdateConfig(cfg *Config) {
//
// NOTE: the rpc server is not registered with any grpc server in this function.
func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error {
var ready bool

// If by the time this function exits we haven't yet given the ready
// signal, we detect it here and signal that the daemon should quit.
defer func() {
if !ready {
close(s.quit)
}
}()

// Show version at startup.
srvrLog.Infof("Version: %s, build=%s, logging=%s, "+
"debuglevel=%s, active_network=%v", Version(), build.Deployment,
Expand Down Expand Up @@ -239,6 +249,7 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error {
shutdownFuncs = nil

close(s.ready)
ready = true

return nil
}
Expand Down Expand Up @@ -701,12 +712,23 @@ func (s *Server) waitForReady() error {
// shutdown in case of a startup error). If we shut down after passing
// this part of the code, the called component will handle the quit
// signal.
select {
case <-s.ready:
return nil

// In order to give priority to the quit signal, we wrap the blocking
// select so that we give a chance to the quit signal to be read first.
// This is needed as there is currently no wait to un-set the ready
// signal, so we would have a race between the 2 channels.
select {
case <-s.quit:
return fmt.Errorf("tapd is shutting down")

default:
// We now wait for either signal to be provided.
select {
case <-s.ready:
return nil
case <-s.quit:
return fmt.Errorf("tapd is shutting down")
}
}
}

Expand Down Expand Up @@ -820,7 +842,13 @@ func (s *Server) Name() protofsm.EndpointName {
//
// NOTE: This method is part of the protofsm.MsgEndpoint interface.
func (s *Server) CanHandle(msg protofsm.PeerMsg) bool {
<-s.ready
err := s.waitForReady()
if err != nil {
srvrLog.Debugf("Can't handle PeerMsg, server not ready %v",
err)
return false
}

return s.cfg.AuxFundingController.CanHandle(msg)
}

Expand All @@ -829,7 +857,13 @@ func (s *Server) CanHandle(msg protofsm.PeerMsg) bool {
//
// NOTE: This method is part of the protofsm.MsgEndpoint interface.
func (s *Server) SendMessage(msg protofsm.PeerMsg) bool {
<-s.ready
err := s.waitForReady()
if err != nil {
srvrLog.Debugf("Failed to send PeerMsg, server not ready %v",
err)
return false
}

return s.cfg.AuxFundingController.SendMessage(msg)
}

Expand Down

0 comments on commit 65539b6

Please sign in to comment.