Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flow-go Components for composing #682

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ generate:
mockery --dir=storage --name=TraceIndexer --output=storage/mocks
mockery --all --dir=services/traces --output=services/traces/mocks
mockery --all --dir=services/ingestion --output=services/ingestion/mocks
mockery --dir=models --name=Engine --output=models/mocks

.PHONY: ci
ci: check-tidy test e2e-test
Expand Down
87 changes: 64 additions & 23 deletions api/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,95 @@ import (
"net/http"
_ "net/http/pprof"
"strconv"
"time"

"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"

"github.com/rs/zerolog"
)

type ProfileServer struct {
logger zerolog.Logger
component.Component

log zerolog.Logger
server *http.Server
endpoint string
}

var _ component.Component = (*ProfileServer)(nil)

func NewProfileServer(
logger zerolog.Logger,
host string,
port int,
) *ProfileServer {
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
return &ProfileServer{
logger: logger,

s := &ProfileServer{
log: logger,
server: &http.Server{Addr: endpoint},
endpoint: endpoint,
}
}

func (s *ProfileServer) ListenAddr() string {
return s.endpoint
s.Component = component.NewComponentManagerBuilder().
AddWorker(s.serve).
AddWorker(s.shutdownOnContextDone).
Build()

return s
}

func (s *ProfileServer) Start() {
go func() {
err := s.server.ListenAndServe()
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
s.logger.Warn().Msg("Profiler server shutdown")
return
}
s.logger.Err(err).Msg("failed to start Profiler server")
panic(err)
func (s *ProfileServer) serve(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
s.log.Info().Msg("starting profiler server on address")

l, err := net.Listen("tcp", s.endpoint)
if err != nil {
s.log.Err(err).Msg("failed to start the metrics server")
ctx.Throw(err)
return
}

ready()

// pass the signaler context to the server so that the signaler context
// can control the server's lifetime
s.server.BaseContext = func(_ net.Listener) context.Context {
return ctx
}

err = s.server.Serve(l) // blocking call
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
}()

log.Err(err).Msg("fatal error in the metrics server")
ctx.Throw(err)
}
}

func (s *ProfileServer) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
func (s *ProfileServer) shutdownOnContextDone(ictx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
<-ictx.Done()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

return s.server.Shutdown(ctx)
}
err := s.server.Shutdown(ctx)
if err == nil {
s.log.Info().Msg("Profiler server graceful shutdown completed")
}

func (s *ProfileServer) Close() error {
return s.server.Close()
if errors.Is(err, ctx.Err()) {
s.log.Warn().Msg("Profiler server graceful shutdown timed out")
err := s.server.Close()
if err != nil {
s.log.Err(err).Msg("error closing profiler server")
}
} else {
s.log.Err(err).Msg("error shutting down profiler server")
}
}
113 changes: 73 additions & 40 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"time"

"github.com/onflow/go-ethereum/core"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"

gethVM "github.com/onflow/go-ethereum/core/vm"
gethLog "github.com/onflow/go-ethereum/log"
"github.com/onflow/go-ethereum/rpc"
Expand Down Expand Up @@ -57,8 +61,12 @@ type Server struct {

config config.Config
collector metrics.Collector

startupCompleted chan struct{}
}

var _ component.Component = (*Server)(nil)

const (
shutdownTimeout = 5 * time.Second
batchRequestLimit = 50
Expand All @@ -79,10 +87,11 @@ func NewServer(
gethLog.SetDefault(gethLog.NewLogger(zeroSlog))

return &Server{
logger: logger,
timeouts: rpc.DefaultHTTPTimeouts,
config: cfg,
collector: collector,
logger: logger,
timeouts: rpc.DefaultHTTPTimeouts,
config: cfg,
collector: collector,
startupCompleted: make(chan struct{}),
}
}

Expand Down Expand Up @@ -179,9 +188,10 @@ func (h *Server) disableWS() bool {
}

// Start starts the HTTP server if it is enabled and not already running.
func (h *Server) Start() error {
func (h *Server) Start(ctx irrecoverable.SignalerContext) {
defer close(h.startupCompleted)
if h.endpoint == "" || h.listener != nil {
return nil // already running or not configured
return // already running or not configured
}

// Initialize the server.
Expand All @@ -192,16 +202,21 @@ func (h *Server) Start() error {
h.server.ReadHeaderTimeout = h.timeouts.ReadHeaderTimeout
h.server.WriteTimeout = h.timeouts.WriteTimeout
h.server.IdleTimeout = h.timeouts.IdleTimeout
h.server.BaseContext = func(_ net.Listener) context.Context {
return ctx
}
}

listenConfig := net.ListenConfig{}
// Start the server.
listener, err := net.Listen("tcp", h.endpoint)
listener, err := listenConfig.Listen(ctx, "tcp", h.endpoint)
if err != nil {
// If the server fails to start, we need to clear out the RPC and WS
// configurations so they can be configured another time.
h.disableRPC()
h.disableWS()
return err
ctx.Throw(err)
return
}

h.listener = listener
Expand All @@ -213,7 +228,7 @@ func (h *Server) Start() error {
return
}
h.logger.Err(err).Msg("failed to start API server")
panic(err)
ctx.Throw(err)
}
}()

Expand All @@ -225,8 +240,17 @@ func (h *Server) Start() error {
url := fmt.Sprintf("ws://%v", listener.Addr())
h.logger.Info().Msgf("JSON-RPC over WebSocket enabled: %s", url)
}
}

return nil
func (h *Server) Ready() <-chan struct{} {
ready := make(chan struct{})

go func() {
<-h.startupCompleted
close(ready)
}()

return ready
}

// disableRPC stops the JSON-RPC over HTTP handler.
Expand Down Expand Up @@ -296,41 +320,50 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}

// Stop shuts down the HTTP server.
func (h *Server) Stop() {
if h.listener == nil {
return // not running
}
// Done shuts down the HTTP server.
func (h *Server) Done() <-chan struct{} {
done := make(chan struct{})

// Shut down the server.
httpHandler := h.httpHandler
if httpHandler != nil {
httpHandler.server.Stop()
h.httpHandler = nil
}
go func() {
defer close(done)

wsHandler := h.wsHandler
if wsHandler != nil {
wsHandler.server.Stop()
h.wsHandler = nil
}
if h.listener == nil {
return // not running
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := h.server.Shutdown(ctx)
if err != nil && err == ctx.Err() {
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
h.server.Close()
}
// Shut down the server.
httpHandler := h.httpHandler
if httpHandler != nil {
httpHandler.server.Stop()
h.httpHandler = nil
}

wsHandler := h.wsHandler
if wsHandler != nil {
wsHandler.server.Stop()
h.wsHandler = nil
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := h.server.Shutdown(ctx)
if err != nil && err == ctx.Err() {
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
h.server.Close()
}

h.listener.Close()
h.logger.Info().Msgf(
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
)
h.listener.Close()
h.logger.Info().Msgf(
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
)

// Clear out everything to allow re-configuring it later.
h.host, h.port, h.endpoint = "", 0, ""
h.server, h.listener = nil, nil

}()

// Clear out everything to allow re-configuring it later.
h.host, h.port, h.endpoint = "", 0, ""
h.server, h.listener = nil, nil
return done
}

// CheckTimeouts ensures that timeout values are meaningful
Expand Down
14 changes: 3 additions & 11 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func newSubscription[T any](

rpcSub := notifier.CreateSubscription()

subs := models.NewSubscription(logger, callback(notifier, rpcSub))
subs := models.NewSubscription(callback(notifier, rpcSub))

l := logger.With().
Str("gateway-subscription-id", fmt.Sprintf("%p", subs)).
Expand All @@ -190,16 +190,8 @@ func newSubscription[T any](
go func() {
defer publisher.Unsubscribe(subs)

for {
select {
case err := <-subs.Error():
l.Debug().Err(err).Msg("subscription returned error")
return
case err := <-rpcSub.Err():
l.Debug().Err(err).Msg("client unsubscribed")
return
}
}
err := <-rpcSub.Err()
l.Debug().Err(err).Msg("client unsubscribed")
Comment on lines +193 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors from the subscription to prevent unhandled errors

Currently, the code only listens for errors from rpcSub.Err() and ignores potential errors from subs.Err(). This may lead to unhandled errors and resource leaks. Consider adding error handling for subs.Err().

Apply this diff:

	go func() {
		defer publisher.Unsubscribe(subs)
-		err := <-rpcSub.Err()
-		l.Debug().Err(err).Msg("client unsubscribed")
+		select {
+		case err := <-rpcSub.Err():
+			l.Debug().Err(err).Msg("client unsubscribed")
+		case err := <-subs.Err():
+			l.Error().Err(err).Msg("subscription error")
+		}
	}()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err := <-rpcSub.Err()
l.Debug().Err(err).Msg("client unsubscribed")
go func() {
defer publisher.Unsubscribe(subs)
select {
case err := <-rpcSub.Err():
l.Debug().Err(err).Msg("client unsubscribed")
case err := <-subs.Err():
l.Error().Err(err).Msg("subscription error")
}
}()

}()

l.Info().Msg("new heads subscription created")
Expand Down
Loading
Loading