diff --git a/cmd/gatewayd_app.go b/cmd/gatewayd_app.go index fdd4c49d..2286a607 100644 --- a/cmd/gatewayd_app.go +++ b/cmd/gatewayd_app.go @@ -248,63 +248,95 @@ func (app *GatewayDApp) createPluginRegistry(runCtx context.Context, logger zero // startMetricsMerger starts the metrics merger if enabled. func (app *GatewayDApp) startMetricsMerger(runCtx context.Context, logger zerolog.Logger) { + _, span := otel.Tracer(config.TracerName).Start(runCtx, "Start metrics merger") + defer span.End() + // Start the metrics merger if enabled. - if app.conf.Plugin.EnableMetricsMerger { - app.metricsMerger = metrics.NewMerger(runCtx, metrics.Merger{ - MetricsMergerPeriod: app.conf.Plugin.MetricsMergerPeriod, - Logger: logger, - }) - app.pluginRegistry.ForEach(func(_ sdkPlugin.Identifier, plugin *plugin.Plugin) { - if metricsEnabled, err := strconv.ParseBool(plugin.Config["metricsEnabled"]); err == nil && metricsEnabled { + if !app.conf.Plugin.EnableMetricsMerger { + logger.Info().Msg("Metrics merger is disabled") + span.AddEvent("Metrics merger is disabled") + return + } + + // Create a new metrics merger. + app.metricsMerger = metrics.NewMerger(runCtx, metrics.Merger{ + MetricsMergerPeriod: app.conf.Plugin.MetricsMergerPeriod, + Logger: logger, + }) + + // Add the plugins to the metrics merger. + app.pluginRegistry.ForEach( + func(_ sdkPlugin.Identifier, plugin *plugin.Plugin) { + metricsEnabled, err := strconv.ParseBool(plugin.Config["metricsEnabled"]) + if err == nil && metricsEnabled { app.metricsMerger.Add(plugin.ID.Name, plugin.Config["metricsUnixDomainSocket"]) - logger.Debug().Str("plugin", plugin.ID.Name).Msg( - "Added plugin to metrics merger") + logger.Debug(). + Str("plugin", plugin.ID.Name). + Msg("Added plugin to metrics merger") + span.AddEvent("Added plugin to metrics merger") } - }) - app.metricsMerger.Start() //nolint:contextcheck - } + }, + ) + + // Start the metrics merger in the background if there are plugins to merge metrics from. + app.metricsMerger.Start() //nolint:contextcheck } // startHealthCheckScheduler starts the health check scheduler if enabled. func (app *GatewayDApp) startHealthCheckScheduler( runCtx, ctx context.Context, span trace.Span, logger zerolog.Logger, ) { + healthCheck := func() { + _, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check") + defer span.End() + + plugins := []string{} + app.pluginRegistry.ForEach( + func(pluginId sdkPlugin.Identifier, plugin *plugin.Plugin) { + err := plugin.Ping() + if err == nil { + logger.Trace().Str("name", pluginId.Name).Msg("Successfully pinged plugin") + plugins = append(plugins, pluginId.Name) + return + } + + span.RecordError(err) + logger.Error().Err(err).Msg("Failed to ping plugin") + + // Remove the plugin from the metrics merger to prevent errors. + if app.conf.Plugin.EnableMetricsMerger && app.metricsMerger != nil { + app.metricsMerger.Remove(pluginId.Name) + } + + // Remove the plugin from the registry. + app.pluginRegistry.Remove(pluginId) + + if !app.conf.Plugin.ReloadOnCrash { + return // Do not reload the plugins. + } + + // Reload the plugins and register their hooks upon crash. + logger.Info().Str("name", pluginId.Name).Msg("Reloading crashed plugin") + // + pluginConfig := app.conf.Plugin.GetPlugins(pluginId.Name) + if pluginConfig != nil { + // Load the plugins and register their hooks. + app.pluginRegistry.LoadPlugins( + runCtx, pluginConfig, app.conf.Plugin.StartTimeout) + } + }, + ) + span.SetAttributes(attribute.StringSlice("plugins", plugins)) + } + // Ping the plugins to check if they are alive, and remove them if they are not. startDelay := time.Now().Add(app.conf.Plugin.HealthCheckPeriod) - if _, err := app.healthCheckScheduler.Every( - app.conf.Plugin.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do( - func() { - _, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check") - defer span.End() - - var plugins []string - app.pluginRegistry.ForEach( - func(pluginId sdkPlugin.Identifier, plugin *plugin.Plugin) { - if err := plugin.Ping(); err != nil { - span.RecordError(err) - logger.Error().Err(err).Msg("Failed to ping plugin") - if app.conf.Plugin.EnableMetricsMerger && app.metricsMerger != nil { - app.metricsMerger.Remove(pluginId.Name) - } - app.pluginRegistry.Remove(pluginId) - - if !app.conf.Plugin.ReloadOnCrash { - return // Do not reload the plugins. - } - - // Reload the plugins and register their hooks upon crash. - logger.Info().Str("name", pluginId.Name).Msg("Reloading crashed plugin") - pluginConfig := app.conf.Plugin.GetPlugins(pluginId.Name) - if pluginConfig != nil { - app.pluginRegistry.LoadPlugins(runCtx, pluginConfig, app.conf.Plugin.StartTimeout) - } - } else { - logger.Trace().Str("name", pluginId.Name).Msg("Successfully pinged plugin") - plugins = append(plugins, pluginId.Name) - } - }) - span.SetAttributes(attribute.StringSlice("plugins", plugins)) - }); err != nil { + _, err := app.healthCheckScheduler. + Every(app.conf.Plugin.HealthCheckPeriod). + SingletonMode(). + StartAt(startDelay). + Do(healthCheck) + if err != nil { logger.Error().Err(err).Msg("Failed to start plugin health check scheduler") span.RecordError(err) } @@ -315,6 +347,7 @@ func (app *GatewayDApp) startHealthCheckScheduler( "healthCheckPeriod", app.conf.Plugin.HealthCheckPeriod.String(), ).Msg("Starting plugin health check scheduler") app.healthCheckScheduler.StartAsync() + span.AddEvent("Started plugin health check scheduler") } } diff --git a/metrics/merger.go b/metrics/merger.go index b07bc58c..ed2f873c 100644 --- a/metrics/merger.go +++ b/metrics/merger.go @@ -222,6 +222,12 @@ func (m *Merger) Start() { ctx, span := otel.Tracer(config.TracerName).Start(m.ctx, "Metrics merger") defer span.End() + if len(m.Addresses) == 0 { + m.Logger.Info().Msg("No plugins to merge metrics from") + span.AddEvent("No plugins to merge metrics from") + return + } + startDelay := time.Now().Add(m.MetricsMergerPeriod) // Merge metrics from plugins by reading from their unix domain sockets periodically. if _, err := m.scheduler. @@ -253,15 +259,13 @@ func (m *Merger) Start() { sentry.CaptureException(err) } - if len(m.Addresses) > 0 { - m.scheduler.StartAsync() - m.Logger.Info().Fields( - map[string]any{ - "startDelay": startDelay.Format(time.RFC3339), - "metricsMergerPeriod": m.MetricsMergerPeriod.String(), - }, - ).Msg("Started the metrics merger scheduler") - } + m.scheduler.StartAsync() + m.Logger.Info().Fields( + map[string]any{ + "startDelay": startDelay.Format(time.RFC3339), + "metricsMergerPeriod": m.MetricsMergerPeriod.String(), + }, + ).Msg("Started the metrics merger scheduler") } // Stop stops the metrics merger. diff --git a/network/proxy.go b/network/proxy.go index f67d3b45..4261f654 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -76,52 +76,66 @@ func NewProxy( HealthCheckPeriod: pxy.HealthCheckPeriod, } - startDelay := time.Now().Add(proxy.HealthCheckPeriod) - // Schedule the client health check. - if _, err := proxy.scheduler.Every(proxy.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do( - func() { - now := time.Now() - proxy.Logger.Trace().Msg("Running the client health check to recycle connection(s).") - proxy.AvailableConnections.ForEach(func(_, value any) bool { - if client, ok := value.(*Client); ok { - // Connection is probably dead by now. - proxy.AvailableConnections.Remove(client.ID) - client.Close() - // Create a new client. - client = NewClient( - proxyCtx, proxy.ClientConfig, proxy.Logger, - NewRetry( - Retry{ - Retries: proxy.ClientConfig.Retries, - Backoff: config.If( - proxy.ClientConfig.Backoff > 0, - proxy.ClientConfig.Backoff, - config.DefaultBackoff, - ), - BackoffMultiplier: proxy.ClientConfig.BackoffMultiplier, - DisableBackoffCaps: proxy.ClientConfig.DisableBackoffCaps, - Logger: proxy.Logger, - }, + connHealthCheck := func() { + now := time.Now() + proxy.Logger.Trace().Msg("Running the client health check to recycle connection(s).") + span.AddEvent("Running the client health check to recycle connection(s).") + proxy.AvailableConnections.ForEach(func(_, value any) bool { + client, ok := value.(*Client) + if !ok { + proxy.Logger.Error().Msg("Failed to cast the client to the Client type") + return true + } + + // Connection is probably dead by now. + proxy.AvailableConnections.Remove(client.ID) + client.Close() + + // Create a new client. + client = NewClient( + proxyCtx, proxy.ClientConfig, proxy.Logger, + NewRetry( + Retry{ + Retries: proxy.ClientConfig.Retries, + Backoff: config.If( + proxy.ClientConfig.Backoff > 0, + proxy.ClientConfig.Backoff, + config.DefaultBackoff, ), - ) - if client != nil && client.ID != "" { - if err := proxy.AvailableConnections.Put(client.ID, client); err != nil { - proxy.Logger.Err(err).Msg("Failed to update the client connection") - // Close the client, because we don't want to have orphaned connections. - client.Close() - } - } else { - proxy.Logger.Error().Msg("Failed to create a new client connection") - } + BackoffMultiplier: proxy.ClientConfig.BackoffMultiplier, + DisableBackoffCaps: proxy.ClientConfig.DisableBackoffCaps, + Logger: proxy.Logger, + }, + ), + ) + if client != nil && client.ID != "" { + if err := proxy.AvailableConnections.Put(client.ID, client); err != nil { + proxy.Logger.Err(err).Msg("Failed to update the client connection") + // Close the client, because we don't want to have orphaned connections. + client.Close() } - return true - }) - proxy.Logger.Trace().Str("duration", time.Since(now).String()).Msg( - "Finished the client health check") - metrics.ProxyHealthChecks.WithLabelValues( - proxy.GetGroupName(), proxy.GetBlockName()).Inc() - }, - ); err != nil { + } else { + proxy.Logger.Error().Msg("Failed to create a new client connection") + span.RecordError(gerr.ErrClientNotConnected) + } + return true + }) + proxy.Logger.Trace(). + Str("duration", time.Since(now).String()). + Msg("Finished the client health check") + span.AddEvent("Finished the client health check") + metrics.ProxyHealthChecks.WithLabelValues( + proxy.GetGroupName(), proxy.GetBlockName()).Inc() + } + + // Schedule the client health check. + startDelay := time.Now().Add(proxy.HealthCheckPeriod) + _, err := proxy.scheduler. + Every(proxy.HealthCheckPeriod). + SingletonMode(). + StartAt(startDelay). + Do(connHealthCheck) + if err != nil { proxy.Logger.Error().Err(err).Msg("Failed to schedule the client health check") sentry.CaptureException(err) span.RecordError(err) @@ -135,6 +149,7 @@ func NewProxy( "healthCheckPeriod": proxy.HealthCheckPeriod.String(), }, ).Msg("Started the client health check scheduler") + span.AddEvent("Started the client health check scheduler") return &proxy }