Skip to content

Commit

Permalink
Refactor scheduler functions (#649)
Browse files Browse the repository at this point in the history
* Refactor health check scheduler
* Do not start metrics merger if no address is registered
* Refactor metrics merger function
* Refactor connection health check function
  • Loading branch information
mostafa authored Dec 27, 2024
1 parent ce578fc commit 94397ea
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 99 deletions.
125 changes: 79 additions & 46 deletions cmd/gatewayd_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,63 +248,95 @@ func (app *GatewayDApp) createPluginRegistry(runCtx context.Context, logger zero

// startMetricsMerger starts the metrics merger if enabled.
func (app *GatewayDApp) startMetricsMerger(runCtx context.Context, logger zerolog.Logger) {
_, span := otel.Tracer(config.TracerName).Start(runCtx, "Start metrics merger")
defer span.End()

// Start the metrics merger if enabled.
if app.conf.Plugin.EnableMetricsMerger {
app.metricsMerger = metrics.NewMerger(runCtx, metrics.Merger{
MetricsMergerPeriod: app.conf.Plugin.MetricsMergerPeriod,
Logger: logger,
})
app.pluginRegistry.ForEach(func(_ sdkPlugin.Identifier, plugin *plugin.Plugin) {
if metricsEnabled, err := strconv.ParseBool(plugin.Config["metricsEnabled"]); err == nil && metricsEnabled {
if !app.conf.Plugin.EnableMetricsMerger {
logger.Info().Msg("Metrics merger is disabled")
span.AddEvent("Metrics merger is disabled")
return
}

// Create a new metrics merger.
app.metricsMerger = metrics.NewMerger(runCtx, metrics.Merger{
MetricsMergerPeriod: app.conf.Plugin.MetricsMergerPeriod,
Logger: logger,
})

// Add the plugins to the metrics merger.
app.pluginRegistry.ForEach(
func(_ sdkPlugin.Identifier, plugin *plugin.Plugin) {
metricsEnabled, err := strconv.ParseBool(plugin.Config["metricsEnabled"])
if err == nil && metricsEnabled {
app.metricsMerger.Add(plugin.ID.Name, plugin.Config["metricsUnixDomainSocket"])
logger.Debug().Str("plugin", plugin.ID.Name).Msg(
"Added plugin to metrics merger")
logger.Debug().
Str("plugin", plugin.ID.Name).
Msg("Added plugin to metrics merger")
span.AddEvent("Added plugin to metrics merger")
}
})
app.metricsMerger.Start() //nolint:contextcheck
}
},
)

// Start the metrics merger in the background if there are plugins to merge metrics from.
app.metricsMerger.Start() //nolint:contextcheck
}

// startHealthCheckScheduler starts the health check scheduler if enabled.
func (app *GatewayDApp) startHealthCheckScheduler(
runCtx, ctx context.Context, span trace.Span, logger zerolog.Logger,
) {
healthCheck := func() {
_, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
defer span.End()

plugins := []string{}
app.pluginRegistry.ForEach(
func(pluginId sdkPlugin.Identifier, plugin *plugin.Plugin) {
err := plugin.Ping()
if err == nil {
logger.Trace().Str("name", pluginId.Name).Msg("Successfully pinged plugin")
plugins = append(plugins, pluginId.Name)
return
}

span.RecordError(err)
logger.Error().Err(err).Msg("Failed to ping plugin")

// Remove the plugin from the metrics merger to prevent errors.
if app.conf.Plugin.EnableMetricsMerger && app.metricsMerger != nil {
app.metricsMerger.Remove(pluginId.Name)
}

// Remove the plugin from the registry.
app.pluginRegistry.Remove(pluginId)

if !app.conf.Plugin.ReloadOnCrash {
return // Do not reload the plugins.
}

// Reload the plugins and register their hooks upon crash.
logger.Info().Str("name", pluginId.Name).Msg("Reloading crashed plugin")
//
pluginConfig := app.conf.Plugin.GetPlugins(pluginId.Name)
if pluginConfig != nil {
// Load the plugins and register their hooks.
app.pluginRegistry.LoadPlugins(
runCtx, pluginConfig, app.conf.Plugin.StartTimeout)
}
},
)
span.SetAttributes(attribute.StringSlice("plugins", plugins))
}

// Ping the plugins to check if they are alive, and remove them if they are not.
startDelay := time.Now().Add(app.conf.Plugin.HealthCheckPeriod)
if _, err := app.healthCheckScheduler.Every(
app.conf.Plugin.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do(
func() {
_, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
defer span.End()

var plugins []string
app.pluginRegistry.ForEach(
func(pluginId sdkPlugin.Identifier, plugin *plugin.Plugin) {
if err := plugin.Ping(); err != nil {
span.RecordError(err)
logger.Error().Err(err).Msg("Failed to ping plugin")
if app.conf.Plugin.EnableMetricsMerger && app.metricsMerger != nil {
app.metricsMerger.Remove(pluginId.Name)
}
app.pluginRegistry.Remove(pluginId)

if !app.conf.Plugin.ReloadOnCrash {
return // Do not reload the plugins.
}

// Reload the plugins and register their hooks upon crash.
logger.Info().Str("name", pluginId.Name).Msg("Reloading crashed plugin")
pluginConfig := app.conf.Plugin.GetPlugins(pluginId.Name)
if pluginConfig != nil {
app.pluginRegistry.LoadPlugins(runCtx, pluginConfig, app.conf.Plugin.StartTimeout)
}
} else {
logger.Trace().Str("name", pluginId.Name).Msg("Successfully pinged plugin")
plugins = append(plugins, pluginId.Name)
}
})
span.SetAttributes(attribute.StringSlice("plugins", plugins))
}); err != nil {
_, err := app.healthCheckScheduler.
Every(app.conf.Plugin.HealthCheckPeriod).
SingletonMode().
StartAt(startDelay).
Do(healthCheck)
if err != nil {
logger.Error().Err(err).Msg("Failed to start plugin health check scheduler")
span.RecordError(err)
}
Expand All @@ -315,6 +347,7 @@ func (app *GatewayDApp) startHealthCheckScheduler(
"healthCheckPeriod", app.conf.Plugin.HealthCheckPeriod.String(),
).Msg("Starting plugin health check scheduler")
app.healthCheckScheduler.StartAsync()
span.AddEvent("Started plugin health check scheduler")
}
}

Expand Down
22 changes: 13 additions & 9 deletions metrics/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func (m *Merger) Start() {
ctx, span := otel.Tracer(config.TracerName).Start(m.ctx, "Metrics merger")
defer span.End()

if len(m.Addresses) == 0 {
m.Logger.Info().Msg("No plugins to merge metrics from")
span.AddEvent("No plugins to merge metrics from")
return
}

startDelay := time.Now().Add(m.MetricsMergerPeriod)
// Merge metrics from plugins by reading from their unix domain sockets periodically.
if _, err := m.scheduler.
Expand Down Expand Up @@ -253,15 +259,13 @@ func (m *Merger) Start() {
sentry.CaptureException(err)
}

if len(m.Addresses) > 0 {
m.scheduler.StartAsync()
m.Logger.Info().Fields(
map[string]any{
"startDelay": startDelay.Format(time.RFC3339),
"metricsMergerPeriod": m.MetricsMergerPeriod.String(),
},
).Msg("Started the metrics merger scheduler")
}
m.scheduler.StartAsync()
m.Logger.Info().Fields(
map[string]any{
"startDelay": startDelay.Format(time.RFC3339),
"metricsMergerPeriod": m.MetricsMergerPeriod.String(),
},
).Msg("Started the metrics merger scheduler")
}

// Stop stops the metrics merger.
Expand Down
103 changes: 59 additions & 44 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,52 +76,66 @@ func NewProxy(
HealthCheckPeriod: pxy.HealthCheckPeriod,
}

startDelay := time.Now().Add(proxy.HealthCheckPeriod)
// Schedule the client health check.
if _, err := proxy.scheduler.Every(proxy.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do(
func() {
now := time.Now()
proxy.Logger.Trace().Msg("Running the client health check to recycle connection(s).")
proxy.AvailableConnections.ForEach(func(_, value any) bool {
if client, ok := value.(*Client); ok {
// Connection is probably dead by now.
proxy.AvailableConnections.Remove(client.ID)
client.Close()
// Create a new client.
client = NewClient(
proxyCtx, proxy.ClientConfig, proxy.Logger,
NewRetry(
Retry{
Retries: proxy.ClientConfig.Retries,
Backoff: config.If(
proxy.ClientConfig.Backoff > 0,
proxy.ClientConfig.Backoff,
config.DefaultBackoff,
),
BackoffMultiplier: proxy.ClientConfig.BackoffMultiplier,
DisableBackoffCaps: proxy.ClientConfig.DisableBackoffCaps,
Logger: proxy.Logger,
},
connHealthCheck := func() {
now := time.Now()
proxy.Logger.Trace().Msg("Running the client health check to recycle connection(s).")
span.AddEvent("Running the client health check to recycle connection(s).")
proxy.AvailableConnections.ForEach(func(_, value any) bool {
client, ok := value.(*Client)
if !ok {
proxy.Logger.Error().Msg("Failed to cast the client to the Client type")
return true
}

// Connection is probably dead by now.
proxy.AvailableConnections.Remove(client.ID)
client.Close()

// Create a new client.
client = NewClient(
proxyCtx, proxy.ClientConfig, proxy.Logger,
NewRetry(
Retry{
Retries: proxy.ClientConfig.Retries,
Backoff: config.If(
proxy.ClientConfig.Backoff > 0,
proxy.ClientConfig.Backoff,
config.DefaultBackoff,
),
)
if client != nil && client.ID != "" {
if err := proxy.AvailableConnections.Put(client.ID, client); err != nil {
proxy.Logger.Err(err).Msg("Failed to update the client connection")
// Close the client, because we don't want to have orphaned connections.
client.Close()
}
} else {
proxy.Logger.Error().Msg("Failed to create a new client connection")
}
BackoffMultiplier: proxy.ClientConfig.BackoffMultiplier,
DisableBackoffCaps: proxy.ClientConfig.DisableBackoffCaps,
Logger: proxy.Logger,
},
),
)
if client != nil && client.ID != "" {
if err := proxy.AvailableConnections.Put(client.ID, client); err != nil {
proxy.Logger.Err(err).Msg("Failed to update the client connection")
// Close the client, because we don't want to have orphaned connections.
client.Close()
}
return true
})
proxy.Logger.Trace().Str("duration", time.Since(now).String()).Msg(
"Finished the client health check")
metrics.ProxyHealthChecks.WithLabelValues(
proxy.GetGroupName(), proxy.GetBlockName()).Inc()
},
); err != nil {
} else {
proxy.Logger.Error().Msg("Failed to create a new client connection")
span.RecordError(gerr.ErrClientNotConnected)
}
return true
})
proxy.Logger.Trace().
Str("duration", time.Since(now).String()).
Msg("Finished the client health check")
span.AddEvent("Finished the client health check")
metrics.ProxyHealthChecks.WithLabelValues(
proxy.GetGroupName(), proxy.GetBlockName()).Inc()
}

// Schedule the client health check.
startDelay := time.Now().Add(proxy.HealthCheckPeriod)
_, err := proxy.scheduler.
Every(proxy.HealthCheckPeriod).
SingletonMode().
StartAt(startDelay).
Do(connHealthCheck)
if err != nil {
proxy.Logger.Error().Err(err).Msg("Failed to schedule the client health check")
sentry.CaptureException(err)
span.RecordError(err)
Expand All @@ -135,6 +149,7 @@ func NewProxy(
"healthCheckPeriod": proxy.HealthCheckPeriod.String(),
},
).Msg("Started the client health check scheduler")
span.AddEvent("Started the client health check scheduler")

return &proxy
}
Expand Down

0 comments on commit 94397ea

Please sign in to comment.