Skip to content

Commit

Permalink
refactor: deduplicate filter upstreams code
Browse files Browse the repository at this point in the history
  • Loading branch information
polsar88 committed Oct 5, 2024
1 parent fe9de04 commit 84b73da
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 48 deletions.
52 changes: 6 additions & 46 deletions internal/route/always_route_filtering_strategy.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package route

import (
"github.com/satsuma-data/node-gateway/internal/config"
"github.com/satsuma-data/node-gateway/internal/metadata"
"github.com/satsuma-data/node-gateway/internal/types"
"go.uber.org/zap"
Expand All @@ -25,7 +24,12 @@ func (s *AlwaysRouteFilteringStrategy) RouteNextRequest(
filters := s.NodeFilters // WARNING: Slice assignment, not a copy!
for len(filters) > 0 {
// Get all healthy upstreams for the current set of filters.
upstreams := s.FilterUpstreams(upstreamsByPriority, requestMetadata, filters)
upstreams := filterUpstreams(
upstreamsByPriority,
requestMetadata,
filters,
s.Logger,
)

// If there is at least one healthy upstream, route using the backing strategy.
if len(upstreams) > 0 {
Expand All @@ -52,50 +56,6 @@ func (s *AlwaysRouteFilteringStrategy) RouteNextRequest(
return s.BackingStrategy.RouteNextRequest(upstreamsByPriority, requestMetadata)
}

// FilterUpstreams filters upstreams based on the provided NodeFilter.
// WARNING: If a given priority does not have any healthy upstreams,
// it will not be included in the returned map.
func (s *AlwaysRouteFilteringStrategy) FilterUpstreams(
upstreamsByPriority types.PriorityToUpstreamsMap,
requestMetadata metadata.RequestMetadata,
nodeFilters []NodeFilter,
) types.PriorityToUpstreamsMap {
priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap)

// Iterate over each priority and filter the associated upstreams.
for priority, upstreamConfigs := range upstreamsByPriority {
s.Logger.Debug(
"Determining healthy upstreams at priority.",
zap.Int("priority", priority),
zap.Any("upstreams", upstreamConfigs),
)

filteredUpstreams := make([]*config.UpstreamConfig, 0)

// Iterate over each upstream and apply the filters.
for _, upstreamConfig := range upstreamConfigs {
pass := true
for _, nodeFilter := range nodeFilters {
pass = nodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs))
if !pass {
break
}
}

if pass {
filteredUpstreams = append(filteredUpstreams, upstreamConfig)
}
}

// Only add the priority to the map if there is at least one healthy upstream.
if len(filteredUpstreams) > 0 {
priorityToHealthyUpstreams[priority] = filteredUpstreams
}
}

return priorityToHealthyUpstreams
}

// removeFilters returns a new slice of filters with the given filter type removed.
func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFilter {
retFilters := make([]NodeFilter, 0)
Expand Down
22 changes: 20 additions & 2 deletions internal/route/filtering_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,34 @@ func (s *FilteringRoutingStrategy) RouteNextRequest(
func (s *FilteringRoutingStrategy) filter(
upstreamsByPriority types.PriorityToUpstreamsMap,
requestMetadata metadata.RequestMetadata,
) types.PriorityToUpstreamsMap {
return filterUpstreams(
upstreamsByPriority,
requestMetadata,
[]NodeFilter{s.NodeFilter},
s.Logger,
)
}

func filterUpstreams(
upstreamsByPriority types.PriorityToUpstreamsMap,
requestMetadata metadata.RequestMetadata,
nodeFilters []NodeFilter,
logger *zap.Logger,
) types.PriorityToUpstreamsMap {
priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap)
nodeFilter := AndFilter{
filters: nodeFilters,
logger: logger,
}

for priority, upstreamConfigs := range upstreamsByPriority {
s.Logger.Debug("Determining healthy upstreams at priority.", zap.Int("priority", priority), zap.Any("upstreams", upstreamConfigs))
logger.Debug("Determining healthy upstreams at priority.", zap.Int("priority", priority), zap.Any("upstreams", upstreamConfigs))

filteredUpstreams := make([]*config.UpstreamConfig, 0)

for _, upstreamConfig := range upstreamConfigs {
ok := s.NodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs))
ok := nodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs))
if ok {
filteredUpstreams = append(filteredUpstreams, upstreamConfig)
}
Expand Down

0 comments on commit 84b73da

Please sign in to comment.