From 7d064246fb7e68a11ef4a1529b605901cc407f50 Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Fri, 25 Oct 2024 15:41:50 -0400 Subject: [PATCH] fix sub deletion Signed-off-by: Jack Ding --- cmd/main.go | 22 ++++++++++++----- go.mod | 8 +++++-- go.sum | 4 ---- .../github.com/redhat-cne/rest-api/routes.go | 4 +++- .../redhat-cne/sdk-go/pkg/channel/types.go | 2 ++ .../sdk-go/pkg/protocol/http/http.go | 24 +++++++++++++++---- .../redhat-cne/sdk-go/v1/pubsub/pubsub.go | 5 ++++ vendor/modules.txt | 4 ++-- 8 files changed, 53 insertions(+), 20 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 53da86ef..d70fa107 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -349,12 +349,16 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) { if scConfig.StorageType != storageClient.ConfigMap { continue } - if d.Status == channel.SUCCESS && d.Data != nil { - var obj subscriber.Subscriber - if err := json.Unmarshal(d.Data.Data(), &obj); err != nil { - log.Infof("data is not subscriber object ignoring processing") - continue - } + if d.Data == nil { + log.Info("DZK data nil") + continue + } + var obj subscriber.Subscriber + if err := json.Unmarshal(d.Data.Data(), &obj); err != nil { + log.Infof("data is not subscriber object ignoring processing") + continue + } + if d.Status == channel.SUCCESS || d.Status == channel.UPDATE { log.Infof("subscriber processed for %s", d.Address) if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil { log.Errorf("failed to update subscription in configmap %s", err.Error()) @@ -362,6 +366,12 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) { log.Infof("subscriber saved in configmap %s", obj.String()) } } else if d.Status == channel.DELETE { + if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil { + log.Errorf("failed to update subscription in configmap %s", err.Error()) + } else { + log.Infof("subscriber saved in configmap %s", obj.String()) + } + cleanupConfigMap(d.ClientID) } } diff --git a/go.mod b/go.mod index caaa52b7..852a9607 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.23.0 github.com/prometheus/client_golang v1.14.0 - github.com/redhat-cne/rest-api v1.22.0 - github.com/redhat-cne/sdk-go v1.22.0 + github.com/redhat-cne/rest-api v1.22.0-new + github.com/redhat-cne/sdk-go v1.22.0-new github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.1 golang.org/x/net v0.23.0 @@ -25,6 +25,10 @@ require ( sigs.k8s.io/controller-runtime v0.12.3 ) +replace github.com/redhat-cne/sdk-go v1.22.0-new => ../sdk-go + +replace github.com/redhat-cne/rest-api v1.22.0-new => ../rest-api + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect diff --git a/go.sum b/go.sum index c09722d1..5503f61e 100644 --- a/go.sum +++ b/go.sum @@ -419,10 +419,6 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/redhat-cne/rest-api v1.22.0 h1:A0IbrLi07eYteFGimhmrOHVD6q1wk3kPKpG4oVAenrk= -github.com/redhat-cne/rest-api v1.22.0/go.mod h1:9Bb+edIo4zJ3/QUwgTWlwP/ZBbtGExxhnAzzLkQLtEA= -github.com/redhat-cne/sdk-go v1.22.0 h1:oAfCu9jjXLn3/aXASuccZCX1Vlz9iIV9pmuO4vy4dSk= -github.com/redhat-cne/sdk-go v1.22.0/go.mod h1:qeir05dwTscLvqGCIoQPCUM6HUoVmhR7521nXn28utA= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/vendor/github.com/redhat-cne/rest-api/routes.go b/vendor/github.com/redhat-cne/rest-api/routes.go index 380bf7b0..94cb4625 100644 --- a/vendor/github.com/redhat-cne/rest-api/routes.go +++ b/vendor/github.com/redhat-cne/rest-api/routes.go @@ -32,6 +32,7 @@ import ( "github.com/redhat-cne/sdk-go/pkg/pubsub" "github.com/redhat-cne/sdk-go/v1/event" + v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub" "github.com/google/uuid" "github.com/gorilla/mux" @@ -256,6 +257,7 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { } localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, -1) + s.sendOutToDelete(channel.SUBSCRIBER, &pubsub.PubSub{ID: subscriptionID, Resource: v1pubsub.DeleteSub}) respondWithMessage(w, http.StatusOK, "OK") } @@ -270,7 +272,7 @@ func (s *Server) deleteAllSubscriptions(w http.ResponseWriter, _ *http.Request) localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, -(size)) } // go ahead and create QDR to this address - s.sendOutToDelete(channel.SUBSCRIBER, &pubsub.PubSub{ID: "", Resource: "delete-all-subscriptions"}) + s.sendOutToDelete(channel.SUBSCRIBER, &pubsub.PubSub{ID: "", Resource: v1pubsub.DeleteAllSubs}) respondWithMessage(w, http.StatusOK, "deleted all subscriptions") } diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/channel/types.go b/vendor/github.com/redhat-cne/sdk-go/pkg/channel/types.go index f15eb6d6..b11012f7 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/channel/types.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/channel/types.go @@ -24,6 +24,8 @@ const ( SUCCESS //DELETE if the event is to delete DELETE + //UPDATE if the event is to update (i.e. partial delete) + UPDATE //FAILED if the event failed to post FAILED ) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/protocol/http/http.go b/vendor/github.com/redhat-cne/sdk-go/pkg/protocol/http/http.go index 7c9fcd83..df59fa01 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/protocol/http/http.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/protocol/http/http.go @@ -29,6 +29,7 @@ import ( "github.com/redhat-cne/sdk-go/pkg/localmetrics" "github.com/redhat-cne/sdk-go/pkg/protocol" "github.com/redhat-cne/sdk-go/pkg/subscriber" + v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub" subscriberApi "github.com/redhat-cne/sdk-go/v1/subscriber" log "github.com/sirupsen/logrus" ) @@ -154,11 +155,24 @@ func (h *Server) Start(wg *sync.WaitGroup) error { } } else if obj.Action == channel.DELETE { if _, ok := h.Sender[obj.ClientID]; ok { - log.Infof("deleting subscribers") - _ = h.subscriberAPI.DeleteClient(obj.ClientID) - h.DeleteSender(obj.ClientID) - out.Status = channel.DELETE - out.ClientID = obj.ClientID + for subID, sub := range obj.SubStore.Store { + if sub.Resource == v1pubsub.DeleteAllSubs { + log.Infof("deleting all subscribers") + _ = h.subscriberAPI.DeleteClient(obj.ClientID) + h.DeleteSender(obj.ClientID) + out.Status = channel.DELETE + out.ClientID = obj.ClientID + break + } else if sub.Resource == v1pubsub.DeleteSub { + log.Infof("deleting sub with id %s", subID) + _ = h.subscriberAPI.DeleteSubscription(obj.ClientID, subID) + out.Status = channel.UPDATE + out.ClientID = obj.ClientID + } else { + log.Warnf("subscription deleting request with wrong resource %s", sub.Resource) + return + } + } } } } diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go b/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go index 041e12d4..371492b4 100644 --- a/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go +++ b/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go @@ -39,6 +39,11 @@ type API struct { transportEnabled bool } +const ( + DeleteAllSubs string = "delete-all-subscriptions" + DeleteSub string = "delete-subscription" +) + var instance *API var once sync.Once var mu sync.Mutex diff --git a/vendor/modules.txt b/vendor/modules.txt index 90fe032f..d0ecf47c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -184,13 +184,13 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/redhat-cne/rest-api v1.22.0 +# github.com/redhat-cne/rest-api v1.22.0-new => ../rest-api ## explicit; go 1.22 github.com/redhat-cne/rest-api github.com/redhat-cne/rest-api/pkg/localmetrics github.com/redhat-cne/rest-api/pkg/restclient github.com/redhat-cne/rest-api/v2 -# github.com/redhat-cne/sdk-go v1.22.0 +# github.com/redhat-cne/sdk-go v1.22.0-new => ../sdk-go ## explicit; go 1.22 github.com/redhat-cne/sdk-go/pkg/channel github.com/redhat-cne/sdk-go/pkg/common