diff --git a/go.mod b/go.mod index 72e924bbbf6ba..77f5c2a29d943 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.0 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 - github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 + github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc diff --git a/go.sum b/go.sum index 04a93c414253c..63a96bd5429c7 100644 --- a/go.sum +++ b/go.sum @@ -1048,8 +1048,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= -github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 h1:iMShjkEYATnBMbEa2wV4QiK5PU2trw24FOCON3v7+K4= -github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0/go.mod h1:c4ASJAo1QFmXGydDzNed2o0+Fncx+x4YmQ1r9HfYU3c= +github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae h1:0ahWet0QsN8H3JWF4zoO/7Ebe0geM2itJRlonIQB1oI= +github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae/go.mod h1:wJbJeQ2ygiGuBKsur7BPPNe+3pSyHEDPtKa7IU3I8ZA= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/tools/lambda-promtail/go.mod b/tools/lambda-promtail/go.mod index fb35609574b72..c56bb7cea68ea 100644 --- a/tools/lambda-promtail/go.mod +++ b/tools/lambda-promtail/go.mod @@ -10,7 +10,7 @@ require ( github.com/go-kit/log v0.2.1 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 - github.com/grafana/dskit v0.0.0-20240814201308-442170dfed1b + github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae github.com/grafana/loki/v3 v3.0.0-20240809103847-9315b3d03d79 github.com/prometheus/common v0.55.0 github.com/stretchr/testify v1.9.0 @@ -67,7 +67,7 @@ require ( github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 // indirect github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 // indirect - github.com/grafana/pyroscope-go/godeltaprof v0.1.7 // indirect + github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/hashicorp/consul/api v1.29.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect diff --git a/tools/lambda-promtail/go.sum b/tools/lambda-promtail/go.sum index 2627682cc9454..872f04b9ae43f 100644 --- a/tools/lambda-promtail/go.sum +++ b/tools/lambda-promtail/go.sum @@ -216,8 +216,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/grafana/dskit v0.0.0-20240814201308-442170dfed1b h1:w3iQfdftNWfmU86f3Y4Cjzjx/+3AnKfpXzzq2cV8H/Y= -github.com/grafana/dskit v0.0.0-20240814201308-442170dfed1b/go.mod h1:c4ASJAo1QFmXGydDzNed2o0+Fncx+x4YmQ1r9HfYU3c= +github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae h1:0ahWet0QsN8H3JWF4zoO/7Ebe0geM2itJRlonIQB1oI= +github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae/go.mod h1:wJbJeQ2ygiGuBKsur7BPPNe+3pSyHEDPtKa7IU3I8ZA= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 h1:NznuPwItog+rwdVg8hAuGKP29ndRSzJAwhxKldkP8oQ= @@ -226,8 +226,8 @@ github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 h1:ZYk42718k github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0= github.com/grafana/loki/v3 v3.0.0-20240809103847-9315b3d03d79 h1:5/FOzaJLAKXnQzN0MTi41s9irM7iCeKTGJ3d9kYKpu4= github.com/grafana/loki/v3 v3.0.0-20240809103847-9315b3d03d79/go.mod h1:QgSsIqWyevcORssKdnuWnq/eg6vmYj2M8TCenSPfgQk= -github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY= -github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE= +github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= +github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= @@ -300,7 +300,6 @@ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index 02e6e493736b4..616023899b7ec 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -106,16 +106,23 @@ func FromHeader(hs http.Header) []*Header { return result } -// Errorf returns a HTTP gRPC error than is correctly forwarded over +// Error returns a HTTP gRPC error that is correctly forwarded over // gRPC, and can eventually be converted back to a HTTP response with // HTTPResponseFromError. -func Errorf(code int, tmpl string, args ...interface{}) error { +func Error(code int, msg string) error { return ErrorFromHTTPResponse(&HTTPResponse{ Code: int32(code), - Body: []byte(fmt.Sprintf(tmpl, args...)), + Body: []byte(msg), }) } +// Errorf returns a HTTP gRPC error that is correctly forwarded over +// gRPC, and can eventually be converted back to a HTTP response with +// HTTPResponseFromError. +func Errorf(code int, tmpl string, args ...interface{}) error { + return Error(code, fmt.Sprintf(tmpl, args...)) +} + // ErrorFromHTTPResponse converts an HTTP response into a grpc error, and uses HTTP response body as an error message. // Note that if HTTP response body contains non-utf8 string, then returned error cannot be marshalled by protobuf. func ErrorFromHTTPResponse(resp *HTTPResponse) error { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 1ff80d99ac807..083f112bdf137 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -415,6 +415,11 @@ func (i *Lifecycler) setReadOnlyState(readOnly bool, readOnlyLastUpdated time.Ti defer i.stateMtx.Unlock() i.readOnly = readOnly i.readOnlyLastUpdated = readOnlyLastUpdated + if readOnly { + i.lifecyclerMetrics.readonly.Set(1) + } else { + i.lifecyclerMetrics.readonly.Set(0) + } } // ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. @@ -678,8 +683,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { now := time.Now() // The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now. i.setRegisteredAt(now) - // Clear read-only state, and set last update time to "now". - i.setReadOnlyState(false, now) + // Clear read-only state, and set last update time to "zero". + i.setReadOnlyState(false, time.Time{}) // We use the tokens from the file only if it does not exist in the ring yet. if len(tokensFromFile) > 0 { @@ -719,8 +724,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } tokens := Tokens(instanceDesc.Tokens) - level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", - len(tokens), "ring", i.RingName) + ro, rots := instanceDesc.GetReadOnlyState() + level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", len(tokens), "ring", i.RingName, "readOnly", ro, "readOnlyStateUpdate", rots) // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its // ring state as LEAVING. Make sure to switch to the ACTIVE state. diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go index fe29cdfd5fc80..e5f85e4e42387 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go @@ -8,6 +8,7 @@ import ( type LifecyclerMetrics struct { consulHeartbeats prometheus.Counter shutdownDuration *prometheus.HistogramVec + readonly prometheus.Gauge } func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics { @@ -23,6 +24,11 @@ func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *Lifecycle Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins. ConstLabels: prometheus.Labels{"name": ringName}, }, []string{"op", "status"}), + readonly: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "lifecycler_read_only", + Help: "Set to 1 if this lifecycler's instance entry is in read-only state.", + ConstLabels: prometheus.Labels{"name": ringName}, + }), } } diff --git a/vendor/github.com/grafana/dskit/ring/model.go b/vendor/github.com/grafana/dskit/ring/model.go index fb3095172b55b..c4ba6446693b9 100644 --- a/vendor/github.com/grafana/dskit/ring/model.go +++ b/vendor/github.com/grafana/dskit/ring/model.go @@ -594,6 +594,29 @@ func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int { return instancesCountPerZone } +func (d *Desc) readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() (int, int64) { + readOnlyInstances := 0 + oldestReadOnlyUpdatedTimestamp := int64(0) + first := true + + if d != nil { + for _, ingester := range d.Ingesters { + if !ingester.ReadOnly { + continue + } + + readOnlyInstances++ + if first { + oldestReadOnlyUpdatedTimestamp = ingester.ReadOnlyUpdatedTimestamp + } else { + oldestReadOnlyUpdatedTimestamp = min(oldestReadOnlyUpdatedTimestamp, ingester.ReadOnlyUpdatedTimestamp) + } + first = false + } + } + return readOnlyInstances, oldestReadOnlyUpdatedTimestamp +} + type CompareResult int // CompareResult responses diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index ffdcf80ab5268..ae37820202561 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -316,7 +316,7 @@ func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Contex ext.Error.Set(cfg.Logger.Span, true) } - contextTracker.cancelAllContexts(cancellation.NewErrorf(cause)) + contextTracker.cancelAllContexts(cancellation.NewError(errors.New(cause))) cleanupResultsAlreadyReceived() return nil, err } diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index bb7e29c28a410..c8db7da50c61b 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -191,6 +191,12 @@ type Ring struct { // then this value will be 0. oldestRegisteredTimestamp int64 + readOnlyInstances *int // Number of instances with ReadOnly flag set. Only valid if not nil. + // Oldest value of ReadOnlyUpdatedTimestamp for read-only instances. If there are no read-only instances, + // or if any read-only instance has ReadOnlyUpdatedTimestamp == 0 (which should not happen), then this value will be 0. + // Only valid if not nil. + oldestReadOnlyUpdatedTimestamp *int64 + // Maps a token with the information of the instance holding it. This map is immutable and // cannot be changed in place because it's shared "as is" between subrings (the only way to // change it is to create a new one and replace it). @@ -315,7 +321,7 @@ func (r *Ring) starting(ctx context.Context) error { func (r *Ring) loop(ctx context.Context) error { // Update the ring metrics at start of the main loop. r.mtx.Lock() - r.updateRingMetrics(Different) + r.updateRingMetrics() r.mtx.Unlock() r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { @@ -356,11 +362,17 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // when watching the ring for updates). r.mtx.Lock() r.ringDesc = ringDesc - r.updateRingMetrics(rc) + if rc != Equal { + r.updateRingMetrics() + } r.mtx.Unlock() return } + r.setRingStateFromDesc(ringDesc, true, true, true) +} + +func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegisteredTimestampCache, updateReadOnlyInstances bool) { now := time.Now() ringTokens := ringDesc.GetTokens() ringTokensByZone := ringDesc.getTokensByZone() @@ -372,6 +384,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone() writableInstancesWithTokensCount := ringDesc.writableInstancesWithTokensCount() writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone() + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() r.mtx.Lock() defer r.mtx.Unlock() @@ -385,8 +398,14 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.instancesWithTokensCountPerZone = instancesWithTokensCountPerZone r.writableInstancesWithTokensCount = writableInstancesWithTokensCount r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone - r.oldestRegisteredTimestamp = oldestRegisteredTimestamp + if updateRegisteredTimestampCache { + r.oldestRegisteredTimestamp = oldestRegisteredTimestamp + } r.lastTopologyChange = now + if updateReadOnlyInstances { + r.readOnlyInstances = &readOnlyInstances + r.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp + } // Invalidate all cached subrings. if r.shuffledSubringCache != nil { @@ -396,7 +415,9 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.shuffledSubringWithLookbackCache = make(map[subringCacheKey]cachedSubringWithLookback[*Ring]) } - r.updateRingMetrics(rc) + if updateMetrics { + r.updateRingMetrics() + } } // Get returns n (or more) instances which form the replicas for the given key. @@ -636,11 +657,7 @@ func (r *Desc) CountTokens() map[string]int64 { } // updateRingMetrics updates ring metrics. Caller must be holding the Write lock! -func (r *Ring) updateRingMetrics(compareResult CompareResult) { - if compareResult == Equal { - return - } - +func (r *Ring) updateRingMetrics() { numByState := map[string]int{} oldestTimestampByState := map[string]int64{} @@ -668,10 +685,6 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp)) } - if compareResult == EqualButStatesAndTimestamps { - return - } - r.totalTokensGauge.Set(float64(len(r.ringTokens))) } @@ -697,18 +710,16 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // // Subring returned by this method does not contain instances that have read-only field set. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - // Use all possible instances if shuffle sharding is disabled. We don't set size to r.InstancesCount(), because - // that could lead to not all instances being returned when ring zones are unbalanced. - // Reason for not returning entire ring directly is that we need to filter out read-only instances. - if size <= 0 { - size = math.MaxInt - } - if cached := r.getCachedShuffledSubring(identifier, size); cached != nil { return cached } - result := r.shuffleShard(identifier, size, 0, time.Now()) + var result *Ring + if size <= 0 { + result = r.filterOutReadOnlyInstances(0, time.Now()) + } else { + result = r.shuffleShard(identifier, size, 0, time.Now()) + } // Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring, // when we update the cached ring. if result != r { @@ -725,17 +736,20 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // // This function supports caching, but the cache will only be effective if successive calls for the // same identifier are with the same lookbackPeriod and increasing values of now. +// +// Subring returned by this method does not contain read-only instances that have changed their state +// before the lookback period. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { - // Nothing to do if the shard size is not smaller than the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r - } - if cached := r.getCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now); cached != nil { return cached } - result := r.shuffleShard(identifier, size, lookbackPeriod, now) + var result *Ring + if size <= 0 { + result = r.filterOutReadOnlyInstances(lookbackPeriod, now) + } else { + result = r.shuffleShard(identifier, size, lookbackPeriod, now) + } if result != r { r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result) @@ -756,6 +770,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // // If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance), // then r.oldestRegisteredTimestamp is zero too, and we skip this optimization. + // + // Even if some instances are read-only, they must have changed their read-only status within lookback window + // (because they were all registered within lookback window), so they would be included in the result. if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil { return r } @@ -778,6 +795,19 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur var tokens []uint32 if r.cfg.ZoneAwarenessEnabled { + // If we're going to include all instances from this zone, we can simply filter out + // unwanted instances, and avoid iterating through tokens. + if numInstancesPerZone >= r.instancesCountPerZone[zone] { + for id, inst := range r.ringDesc.Ingesters { + if inst.Zone == zone && shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) { + shard[id] = inst + } + } + + // We can go to the next zone, no need to iterate tokens. + continue + } + tokens = r.ringTokensByZone[zone] } else { // When zone-awareness is disabled, we just iterate over 1 single fake zone @@ -819,11 +849,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur instanceID := info.InstanceID instance := r.ringDesc.Ingesters[instanceID] - // The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded. - if lookbackPeriod == 0 && instance.ReadOnly { + if !shouldIncludeReadonlyInstanceInTheShard(instance, lookbackPeriod, lookbackUntil) { continue } - // Include instance in the subring. shard[instanceID] = instance @@ -855,7 +883,56 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } - // Build a read-only ring for the shard. + return r.buildRingForTheShard(shard) +} + +// shouldIncludeReadonlyInstanceInTheShard returns true if instance is not read-only, or when it is read-only and should be included in the shuffle shard. +func shouldIncludeReadonlyInstanceInTheShard(instance InstanceDesc, lookbackPeriod time.Duration, lookbackUntil int64) bool { + if !instance.ReadOnly { + return true + } + // The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded. + if lookbackPeriod == 0 { + return false + } + // With lookback period >0, read only instances are only included if they have not changed read-only status in the lookback window. + // If ReadOnlyUpdatedTimestamp is not set, we include the instance, and extend the shard later. + if lookbackPeriod > 0 && instance.ReadOnlyUpdatedTimestamp > 0 && instance.ReadOnlyUpdatedTimestamp < lookbackUntil { + return false + } + return true +} + +// filterOutReadOnlyInstances removes all read-only instances from the ring, and returns the resulting ring. +func (r *Ring) filterOutReadOnlyInstances(lookbackPeriod time.Duration, now time.Time) *Ring { + lookbackUntil := now.Add(-lookbackPeriod).Unix() + + r.mtx.RLock() + defer r.mtx.RUnlock() + + // If there are no read-only instances, there's no need to do any filtering. + if r.readOnlyInstances != nil && *r.readOnlyInstances == 0 { + return r + } + + // If all readOnlyUpdatedTimestamp values are within lookback window, we can return the ring without any filtering. + if lookbackPeriod > 0 && r.oldestReadOnlyUpdatedTimestamp != nil && *r.oldestReadOnlyUpdatedTimestamp >= lookbackUntil { + return r + } + + shard := make(map[string]InstanceDesc, len(r.ringDesc.Ingesters)) + + for id, inst := range r.ringDesc.Ingesters { + if shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) { + shard[id] = inst + } + } + + return r.buildRingForTheShard(shard) +} + +// buildRingForTheShard builds read-only ring for the shard (this ring won't be updated in the future). +func (r *Ring) buildRingForTheShard(shard map[string]InstanceDesc) *Ring { shardDesc := &Desc{Ingesters: shard} shardTokensByZone := shardDesc.getTokensByZone() shardTokens := mergeTokenGroups(shardTokensByZone) diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go index 84b69de766d00..e74d011d99501 100644 --- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -2,12 +2,14 @@ package runtimeconfig import ( "bytes" + "compress/gzip" "context" "crypto/sha256" "flag" "fmt" "io" "os" + "strings" "sync" "time" @@ -183,8 +185,8 @@ func (om *Manager) loadConfig() error { mergedConfig := map[string]interface{}{} for _, f := range om.cfg.LoadPath { - yamlFile := map[string]interface{}{} - err := yaml.Unmarshal(rawData[f], &yamlFile) + data := rawData[f] + yamlFile, err := om.unmarshalMaybeGzipped(f, data) if err != nil { om.configLoadSuccess.Set(0) return errors.Wrapf(err, "unmarshal file %q", f) @@ -218,6 +220,32 @@ func (om *Manager) loadConfig() error { return nil } +func (om *Manager) unmarshalMaybeGzipped(filename string, data []byte) (map[string]any, error) { + yamlFile := map[string]any{} + if strings.HasSuffix(filename, ".gz") { + r, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, errors.Wrap(err, "read gzipped file") + } + defer r.Close() + err = yaml.NewDecoder(r).Decode(&yamlFile) + return yamlFile, errors.Wrap(err, "uncompress/unmarshal gzipped file") + } + + if err := yaml.Unmarshal(data, &yamlFile); err != nil { + // Give a hint if we think that file is gzipped. + if isGzip(data) { + return nil, errors.Wrap(err, "file looks gzipped but doesn't have a .gz extension") + } + return nil, err + } + return yamlFile, nil +} + +func isGzip(data []byte) bool { + return len(data) > 2 && data[0] == 0x1f && data[1] == 0x8b +} + func mergeConfigMaps(a, b map[string]interface{}) map[string]interface{} { out := make(map[string]interface{}, len(a)) for k, v := range a { diff --git a/vendor/modules.txt b/vendor/modules.txt index 476db0caa1d30..d27464184e2c9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -985,7 +985,7 @@ github.com/gorilla/websocket # github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 ## explicit; go 1.17 github.com/grafana/cloudflare-go -# github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 +# github.com/grafana/dskit v0.0.0-20240827065912-d1bb7ce64fae ## explicit; go 1.21 github.com/grafana/dskit/aws github.com/grafana/dskit/backoff