Skip to content

Commit

Permalink
frontend: Use net.JoinHostPort to support IPv6 addresses (#10650)
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Penner <[email protected]>
Co-authored-by: Periklis Tsirakidis <[email protected]>
Co-authored-by: J Stickler <[email protected]>
  • Loading branch information
3 people authored Dec 14, 2023
1 parent cd74ddf commit a91f3f1
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* [10814](https://github.com/grafana/loki/pull/10814) **shantanualsi,kaviraj** Upgrade prometheus to v0.47.1 and dskit
* [10959](https://github.com/grafana/loki/pull/10959) **slim-bean** introduce a backoff wait on subquery retries.
* [11121](https://github.com/grafana/loki/pull/11121) **periklis** Ensure all lifecycler cfgs ref a valid IPv6 addr and port combination
* [10650](https://github.com/grafana/loki/pull/10650) **matthewpi** Ensure the frontend uses a valid IPv6 addr and port combination

#### Promtail

Expand Down
7 changes: 3 additions & 4 deletions clients/pkg/promtail/discovery/consulagent/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package consulagent
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -527,9 +526,9 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
// since the service may be registered remotely through a different node.
var addr string
if srvCheck.Service.Address != "" {
addr = net.JoinHostPort(srvCheck.Service.Address, fmt.Sprintf("%d", srvCheck.Service.Port))
addr = net.JoinHostPort(srvCheck.Service.Address, strconv.Itoa(srvCheck.Service.Port))
} else {
addr = net.JoinHostPort(member.Addr, fmt.Sprintf("%d", srvCheck.Service.Port))
addr = net.JoinHostPort(member.Addr, strconv.Itoa(srvCheck.Service.Port))
}

labels := model.LabelSet{
Expand Down Expand Up @@ -560,7 +559,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
// Add all key/value pairs from the service's tagged addresses as their own labels.
for k, v := range srvCheck.Service.TaggedAddresses {
name := strutil.SanitizeLabelName(k)
address := fmt.Sprintf("%s:%d", v.Address, v.Port)
address := net.JoinHostPort(v.Address, strconv.Itoa(v.Port))
labels[taggedAddressesLabel+model.LabelName(name)] = model.LabelValue(address)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"hash/fnv"
"math"
"net"
"net/http"
"net/http/httputil"
"net/url"
Expand Down Expand Up @@ -867,7 +868,7 @@ func (t *Loki) compactorAddress() (string, bool, error) {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)
if t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
// In single binary or read modes, this module depends on Server
return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
return net.JoinHostPort(t.Cfg.Server.GRPCListenAddress, strconv.Itoa(t.Cfg.Server.GRPCListenPort)), true, nil
}

if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" {
Expand Down
4 changes: 3 additions & 1 deletion pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -136,7 +138,7 @@ type enqueueResult struct {
func NewFrontend(cfg Config, ring ring.ReadRing, log log.Logger, reg prometheus.Registerer, codec transport.Codec, metricsNamespace string) (*Frontend, error) {
requestsCh := make(chan *frontendRequest)

schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), ring, requestsCh, log)
schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, net.JoinHostPort(cfg.Addr, strconv.Itoa(cfg.Port)), ring, requestsCh, log)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package querier

import (
"fmt"
"net"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -91,7 +92,7 @@ func InitWorkerService(
if cfg.GrpcListenAddress != "" {
listenAddress = cfg.GrpcListenAddress
}
address := fmt.Sprintf("%s:%d", listenAddress, cfg.GrpcListenPort)
address := net.JoinHostPort(listenAddress, strconv.Itoa(cfg.GrpcListenPort))
level.Warn(util_log.Logger).Log(
"msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.",
"address", address)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package cache
import (
"context"
"flag"
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *memcachedClient) updateMemcacheServers() error {
return err
}
for _, srv := range addrs {
servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port))
servers = append(servers, net.JoinHostPort(srv.Target, strconv.Itoa(int(srv.Port))))
}
}

Expand Down

0 comments on commit a91f3f1

Please sign in to comment.