Skip to content

Commit

Permalink
Randomize test ports
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Nov 30, 2024
1 parent 7c9e7c1 commit 0dc1437
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 108 deletions.
3 changes: 1 addition & 2 deletions common/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ func (d *RPCFactory) GetGRPCListener() net.Listener {

func (d *RPCFactory) createGRPCListener() net.Listener {
hostAddress := net.JoinHostPort(getListenIP(d.config, d.logger).String(), convert.IntToString(d.config.GRPCPort))
var err error
grpcListener, err := net.Listen("tcp", hostAddress)

grpcListener, err := net.Listen("tcp", hostAddress)
if err != nil || grpcListener == nil || grpcListener.Addr() == nil {
d.logger.Fatal("Failed to start gRPC listener", tag.Error(err), tag.Service(d.serviceName), tag.Address(hostAddress))
}
Expand Down
108 changes: 32 additions & 76 deletions tests/testcore/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@ import (
"google.golang.org/grpc"
)

const (
frontendPort = 7134
frontendHTTPPort = 7144
historyPort = 7132
matchingPort = 7136
workerPort = 7138 // not really listening
)

type (
TemporalImpl struct {
fxApps []*fx.App
Expand Down Expand Up @@ -118,7 +110,6 @@ type (
namespaceReplicationQueue persistence.NamespaceReplicationQueue
abstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
visibilityStoreFactory visibility.VisibilityStoreFactory
clusterNo int // cluster number
archiverMetadata carchiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
frontendConfig FrontendConfig
Expand All @@ -132,7 +123,7 @@ type (
spanExporters []otelsdktrace.SpanExporter
tlsConfigProvider *encryption.FixedTLSConfigProvider
captureMetricsHandler *metricstest.CaptureHandler
hostsByService map[primitives.ServiceName]static.Hosts
hostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts

onGetClaims func(*authorization.AuthInfo) (*authorization.Claims, error)
onAuthorize func(context.Context, *authorization.Claims, *authorization.CallTarget) (authorization.Result, error)
Expand Down Expand Up @@ -186,7 +177,6 @@ type (
AbstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
VisibilityStoreFactory visibility.VisibilityStoreFactory
Logger log.Logger
ClusterNo int
ArchiverMetadata carchiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
EnableReadHistoryFromArchival bool
Expand All @@ -203,8 +193,9 @@ type (
TLSConfigProvider *encryption.FixedTLSConfigProvider
CaptureMetricsHandler *metricstest.CaptureHandler
// ServiceFxOptions is populated by WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
TaskCategoryRegistry tasks.TaskCategoryRegistry
ServiceFxOptions map[primitives.ServiceName][]fx.Option
TaskCategoryRegistry tasks.TaskCategoryRegistry
HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
}

listenHostPort string
Expand Down Expand Up @@ -253,7 +244,6 @@ func newTemporal(t *testing.T, params *TemporalParams) *TemporalImpl {
namespaceReplicationQueue: params.NamespaceReplicationQueue,
abstractDataStoreFactory: params.AbstractDataStoreFactory,
visibilityStoreFactory: params.VisibilityStoreFactory,
clusterNo: params.ClusterNo,
esConfig: params.ESConfig,
esClient: params.ESClient,
archiverMetadata: params.ArchiverMetadata,
Expand All @@ -270,23 +260,7 @@ func newTemporal(t *testing.T, params *TemporalParams) *TemporalImpl {
dcClient: dynamicconfig.NewMemoryClient(),
serviceFxOptions: params.ServiceFxOptions,
taskCategoryRegistry: params.TaskCategoryRegistry,
}

// set defaults
const minNodes = 1
impl.frontendConfig.NumFrontendHosts = max(minNodes, impl.frontendConfig.NumFrontendHosts)
impl.historyConfig.NumHistoryHosts = max(minNodes, impl.historyConfig.NumHistoryHosts)
impl.matchingConfig.NumMatchingHosts = max(minNodes, impl.matchingConfig.NumMatchingHosts)
impl.workerConfig.NumWorkers = max(minNodes, impl.workerConfig.NumWorkers)
if impl.workerConfig.DisableWorker {
impl.workerConfig.NumWorkers = 0
}

impl.hostsByService = map[primitives.ServiceName]static.Hosts{
primitives.FrontendService: static.Hosts{All: impl.FrontendGRPCAddresses()},
primitives.MatchingService: static.Hosts{All: impl.MatchingServiceAddresses()},
primitives.HistoryService: static.Hosts{All: impl.HistoryServiceAddresses()},
primitives.WorkerService: static.Hosts{All: impl.WorkerServiceAddresses()},
hostsByProtocolByService: params.HostsByProtocolByService,
}

for k, v := range staticOverrides {
Expand Down Expand Up @@ -329,56 +303,26 @@ func (c *TemporalImpl) Stop() error {
}

func (c *TemporalImpl) makeHostMap(serviceName primitives.ServiceName, self string) map[primitives.ServiceName]static.Hosts {
hostMap := maps.Clone(c.hostsByService)
hostMap := maps.Clone(c.hostsByProtocolByService[grpcProtocol])
hosts := hostMap[serviceName]
hosts.Self = self
hostMap[serviceName] = hosts
return hostMap
}

func (c *TemporalImpl) makeGRPCAddresses(num, port int) []string {
hosts := make([]string, num)
for i := range hosts {
hosts[i] = fmt.Sprintf("127.0.%d.%d:%d", c.clusterNo, i+1, port)
}
return hosts
}

func (c *TemporalImpl) FrontendGRPCAddresses() []string {
return c.makeGRPCAddresses(c.frontendConfig.NumFrontendHosts, frontendPort)
}

// Use this to get an address for the Go SDK to connect to.
func (c *TemporalImpl) FrontendGRPCAddress() string {
return c.frontendMembershipAddress
}

// Use this to get an address for a remote cluster to connect to.
func (c *TemporalImpl) RemoteFrontendGRPCAddress() string {
return c.FrontendGRPCAddresses()[0]
return c.hostsByProtocolByService[grpcProtocol][primitives.FrontendService].All[0]
}

func (c *TemporalImpl) FrontendHTTPAddress() string {
// randomize like a load balancer would
addrs := c.FrontendGRPCAddresses()
addr := addrs[rand.Intn(len(addrs))]
host, _, err := net.SplitHostPort(addr)
if err != nil {
panic(fmt.Errorf("Invalid gRPC frontend address: %w", err))
}
return net.JoinHostPort(host, strconv.Itoa(frontendHTTPPort))
}

func (c *TemporalImpl) HistoryServiceAddresses() []string {
return c.makeGRPCAddresses(c.historyConfig.NumHistoryHosts, historyPort)
addrs := c.hostsByProtocolByService[httpProtocol][primitives.FrontendService].All
return addrs[rand.Intn(len(addrs))]
}

func (c *TemporalImpl) MatchingServiceAddresses() []string {
return c.makeGRPCAddresses(c.matchingConfig.NumMatchingHosts, matchingPort)
}

func (c *TemporalImpl) WorkerServiceAddresses() []string {
return c.makeGRPCAddresses(c.workerConfig.NumWorkers, workerPort)
func (c *TemporalImpl) FrontendGRPCAddress() string {
return c.hostsByProtocolByService[grpcProtocol][primitives.FrontendService].All[0]
}

func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient {
Expand Down Expand Up @@ -430,7 +374,7 @@ func (c *TemporalImpl) startFrontend() {
var matchingRawClient resource.MatchingRawClient
var grpcResolver *membership.GRPCResolver

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByProtocolByService[grpcProtocol][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
var namespaceRegistry namespace.Registry
app := fx.New(
Expand All @@ -441,7 +385,7 @@ func (c *TemporalImpl) startFrontend() {
),
fx.Provide(c.frontendConfigProvider),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() config.DCRedirectionPolicy { return config.DCRedirectionPolicy{} }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
Expand Down Expand Up @@ -508,7 +452,7 @@ func (c *TemporalImpl) startFrontend() {
func (c *TemporalImpl) startHistory() {
serviceName := primitives.HistoryService

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByProtocolByService[grpcProtocol][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
app := fx.New(
fx.Supply(
Expand All @@ -518,7 +462,7 @@ func (c *TemporalImpl) startHistory() {
),
fx.Provide(c.GetMetricsHandler),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() config.DCRedirectionPolicy { return config.DCRedirectionPolicy{} }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
Expand Down Expand Up @@ -565,7 +509,7 @@ func (c *TemporalImpl) startHistory() {
func (c *TemporalImpl) startMatching() {
serviceName := primitives.MatchingService

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByProtocolByService[grpcProtocol][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
app := fx.New(
fx.Supply(
Expand All @@ -575,7 +519,7 @@ func (c *TemporalImpl) startMatching() {
),
fx.Provide(c.GetMetricsHandler),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
fx.Provide(c.newRPCFactory),
Expand Down Expand Up @@ -626,7 +570,7 @@ func (c *TemporalImpl) startWorker() {
clusterConfigCopy.EnableGlobalNamespace = true
}

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByProtocolByService[grpcProtocol][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
app := fx.New(
fx.Supply(
Expand All @@ -636,7 +580,7 @@ func (c *TemporalImpl) startWorker() {
),
fx.Provide(c.GetMetricsHandler),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() config.DCRedirectionPolicy { return config.DCRedirectionPolicy{} }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
Expand Down Expand Up @@ -726,7 +670,7 @@ func (c *TemporalImpl) frontendConfigProvider() *config.Config {
Services: map[string]config.Service{
string(primitives.FrontendService): {
RPC: config.RPC{
HTTPPort: frontendHTTPPort,
HTTPPort: int(portFromAddress(c.FrontendHTTPAddress())),
HTTPAdditionalForwardedHeaders: []string{
"this-header-forwarded",
"this-header-prefix-forwarded-*",
Expand Down Expand Up @@ -949,3 +893,15 @@ func (c *TemporalImpl) overrideDynamicConfig(t *testing.T, name dynamicconfig.Ke
t.Cleanup(cleanup)
return cleanup
}

func portFromAddress(addr string) httpPort {
_, port, err := net.SplitHostPort(addr)
if err != nil {
panic(fmt.Errorf("Invalid address: %w", err))
}
portInt, err := strconv.Atoi(port)
if err != nil {
panic(fmt.Errorf("Cannot parse port: %w", err))
}
return httpPort(portInt)
}
Loading

0 comments on commit 0dc1437

Please sign in to comment.