Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zy/test 3 #2

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ npm_licenses.tar.bz2

# Ignore parser debug
y.output
/.idea
27 changes: 27 additions & 0 deletions discovery/kubernetes/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,59 +181,73 @@ func (e *EndpointSlice) enqueue(obj interface{}) {

// Run implements the Discoverer interface.
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
level.Error(e.logger).Log("msg", "zytestingg eps run 1")
defer e.queue.ShutDown()
level.Error(e.logger).Log("msg", "zytestingg eps run 2")

cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
level.Error(e.logger).Log("msg", "zytestingg eps run 3")
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
level.Error(e.logger).Log("msg", "zytestingg eps run 4")
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
}
return
}
level.Error(e.logger).Log("msg", "zytestingg eps run 5")

go func() {
for e.process(ctx, ch) {
}
}()
level.Error(e.logger).Log("msg", "zytestingg eps run 6")

// Block until the target provider is explicitly canceled.
<-ctx.Done()
}

func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := e.queue.Get()
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue1")
if quit {
return false
}
defer e.queue.Done(keyObj)
key := keyObj.(string)
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue2", "key", key)

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "splitting key failed", "key", key)
return true
}
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue3")

o, exists, err := e.endpointSliceStore.GetByKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
return true
}
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue4")
if !exists {
send(ctx, ch, &targetgroup.Group{Source: endpointSliceSourceFromNamespaceAndName(namespace, name)})
return true
}
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue5")

esa, err := e.getEndpointSliceAdaptor(o)
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue6")
if err != nil {
level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err)
return true
}
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue7")

send(ctx, ch, e.buildEndpointSlice(esa))
level.Error(e.logger).Log("msg", "zytestingg keyObj from queue8")
return true
}

Expand Down Expand Up @@ -273,6 +287,7 @@ const (
)

func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgroup.Group {
level.Error(e.logger).Log("msg", "zytestingg buildEndpointSlice 1", "epsname", eps.name())
tg := &targetgroup.Group{
Source: endpointSliceSource(eps),
}
Expand All @@ -281,17 +296,22 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
endpointSliceAddressTypeLabel: lv(eps.addressType()),
}

level.Error(e.logger).Log("msg", "zytestingg buildEndpointSlice 2", "epsname", eps.name())
addObjectMetaLabels(tg.Labels, eps.getObjectMeta(), RoleEndpointSlice)
level.Error(e.logger).Log("msg", "zytestingg buildEndpointSlice 3", "epsname", eps.name())

e.addServiceLabels(eps, tg)
level.Error(e.logger).Log("msg", "zytestingg buildEndpointSlice 4", "epsname", eps.name())

type podEntry struct {
pod *apiv1.Pod
servicePorts []endpointSlicePortAdaptor
}
seenPods := map[string]*podEntry{}
level.Error(e.logger).Log("msg", "zytestingg buildEndpointSlice 5", "epsname", eps.name())

add := func(addr string, ep endpointSliceEndpointAdaptor, port endpointSlicePortAdaptor) {
level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 1", "epsname", eps.name())
a := addr
if port.port() != nil {
a = net.JoinHostPort(addr, strconv.FormatUint(uint64(*port.port()), 10))
Expand Down Expand Up @@ -333,16 +353,19 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
target[endpointSliceEndpointHostnameLabel] = lv(*ep.hostname())
}

level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 2", "epsname", eps.name())
if ep.targetRef() != nil {
target[model.LabelName(endpointSliceAddressTargetKindLabel)] = lv(ep.targetRef().Kind)
target[model.LabelName(endpointSliceAddressTargetNameLabel)] = lv(ep.targetRef().Name)
}

level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 3", "epsname", eps.name())
for k, v := range ep.topology() {
ln := strutil.SanitizeLabelName(k)
target[model.LabelName(endpointSliceEndpointTopologyLabelPrefix+ln)] = lv(v)
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
}
level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 4", "epsname", eps.name())

if e.withNodeMetadata {
if ep.targetRef() != nil && ep.targetRef().Kind == "Node" {
Expand All @@ -360,6 +383,7 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
}
s := namespacedName(pod.Namespace, pod.Name)

level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 5", "epsname", eps.name())
sp, ok := seenPods[s]
if !ok {
sp = &podEntry{pod: pod}
Expand All @@ -369,6 +393,7 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
// Attach standard pod labels.
target = target.Merge(podLabels(pod))

level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 6", "epsname", eps.name())
// Attach potential container port labels matching the endpoint port.
for _, c := range pod.Spec.Containers {
for _, cport := range c.Ports {
Expand All @@ -388,6 +413,7 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
}
}

level.Error(e.logger).Log("msg", "zytestingg ADDbuildEndpointSliceADD 7", "epsname", eps.name())
// Add service port so we know that we have already generated a target
// for it.
sp.servicePorts = append(sp.servicePorts, port)
Expand Down Expand Up @@ -442,6 +468,7 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
}
}
}
level.Error(e.logger).Log("msg", "zytestingg buildEndpointSlice 6", "epsname", eps.name())

return tg
}
Expand Down
14 changes: 13 additions & 1 deletion discovery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,14 @@ const resyncDisabled = 0

// Run implements the discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
level.Error(d.logger).Log("msg", "zytestingg", "role", d.role)
d.Lock()

namespaces := d.getNamespaces()

switch d.role {
case RoleEndpointSlice:
level.Error(d.logger).Log("msg", "zytestingg eps", "role", d.role)
// Check "networking.k8s.io/v1" availability with retries.
// If "v1" is not available, use "networking.k8s.io/v1beta1" for backward compatibility
var v1Supported bool
Expand All @@ -416,6 +418,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
d.Unlock()
return
}
level.Error(d.logger).Log("msg", "zytestingg eps2", "role", d.role, "namespace", namespaces)

for _, namespace := range namespaces {
var informer cache.SharedIndexInformer
Expand All @@ -425,7 +428,15 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = d.selectors.endpointslice.field
options.LabelSelector = d.selectors.endpointslice.label
return e.List(ctx, options)
ss, err := e.List(ctx, options)
//esl := *ss.(disv1.EndpointSliceList)

level.Error(d.logger).Log("msg", "zytestingg eps3", "role", d.role, "len", len(ss.Items))
if err != nil {
level.Error(d.logger).Log("msg", "zytestingg eps3 err", "role", d.role, "len", len(ss.Items), "err", err.Error())
}
level.Error(d.logger).Log("msg", "zytestingg eps3 label", "selector", options.LabelSelector)
return ss, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = d.selectors.endpointslice.field
Expand All @@ -435,6 +446,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{})
} else {
level.Error(d.logger).Log("msg", "zytestingg eps4 wrongg", "role", d.role)
e := d.client.DiscoveryV1beta1().EndpointSlices(namespace)
elw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand Down
28 changes: 28 additions & 0 deletions discovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -300,6 +301,25 @@ func (m *Manager) cleaner(p *Provider) {
}
}

func (m *Manager) printgroup(update *targetgroup.Group) bool {
//level.Warn(m.logger).Log("msg", "zytestingg printGroup")
found := false
//for _, update := range updates {
//level.Warn(m.logger).Log("msg", "zytestingg printGroup 2")
targets := update.Targets
for _, target := range targets {
//level.Warn(m.logger).Log("msg", "zytestingg printGroup 3")
for _, v := range target {
//level.Warn(m.logger).Log("msg", "zytestingg printGroup 4", "key", k, "value", v)
if strings.HasSuffix(string(v), ":11999") {
found = true
}
}
}
//}
return found
}

func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
// Ensure targets from this provider are cleaned up.
defer m.cleaner(p)
Expand All @@ -308,6 +328,7 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
case <-ctx.Done():
return
case tgs, ok := <-updates:
//found := m.printgroup(tgs)
m.metrics.ReceivedUpdates.Inc()
if !ok {
level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
Expand All @@ -317,6 +338,9 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
}

p.mu.RLock()
//if found {
// level.Warn(m.logger).Log("msg", "zytestingg printGroupfound", "len", len(p.subs))
//}
for s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}
Expand Down Expand Up @@ -377,6 +401,10 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
}
for _, tg := range tgs {
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
found := m.printgroup(tg)
if found {
level.Warn(m.logger).Log("msg", "zytestingg printGroup 2FOUNDD", "poolkey", poolKey)
}
m.targets[poolKey][tg.Source] = tg
}
}
Expand Down
44 changes: 44 additions & 0 deletions scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash/fnv"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -109,6 +110,26 @@ type Manager struct {
metrics *scrapeMetrics
}

func (m *Manager) printgroup(update *targetgroup.Group) bool {
//level.Warn(m.logger).Log("msg", "zytestingg printGroup")
found := false
//for _, update := range updates {
level.Warn(m.logger).Log("msg", "zytestingg scrape printGroup 2")
targets := update.Targets
for _, target := range targets {
level.Warn(m.logger).Log("msg", "zytestingg scrape printGroup 3")
for k, v := range target {
level.Warn(m.logger).Log("msg", "zytestingg scrape printGroup 4", "key", k, "value", v)
if strings.HasSuffix(string(v), ":11999") {
level.Warn(m.logger).Log("msg", "zytestingg scrape printGroup FOUNDDD", "key", k, "value", v)
found = true
}
}
}
//}
return found
}

// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
Expand All @@ -117,6 +138,13 @@ func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
select {
case ts := <-tsets:
m.updateTsets(ts)
//for _, v := range ts {
// for _, tg := range v {
// if m.printgroup(tg) {
// level.Warn(m.logger).Log("msg", "zytestingg scrape printGroup FOUNDDD")
// }
// }
//}

select {
case m.triggerReload <- struct{}{}:
Expand Down Expand Up @@ -158,6 +186,11 @@ func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
for _, group := range groups {
if m.printgroup(group) {
level.Warn(m.logger).Log("msg", "zytestingg scrape printGroup FOUNDDD")
}
}
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
Expand Down Expand Up @@ -317,3 +350,14 @@ func (m *Manager) TargetsDroppedCounts() map[string]int {
}
return counts
}

// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given
// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness
// markers written for its series.
func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
if pool, ok := m.scrapePools[targetSet]; ok {
pool.disableEndOfRunStalenessMarkers(targets)
}
}
Loading