From d259f6494bef11bb1d8ccbf9632a1d05a48d4ef6 Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Thu, 4 Mar 2021 15:52:31 +0400 Subject: [PATCH 1/7] Added disable subs logic --- database/redis/subscription.go | 12 ++++++++---- interfaces.go | 1 + notifier/notifier.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/database/redis/subscription.go b/database/redis/subscription.go index 181d454f4..0330a3171 100644 --- a/database/redis/subscription.go +++ b/database/redis/subscription.go @@ -79,7 +79,7 @@ func (connector *DbConnector) updateSubscription(newSubscription *moira.Subscrip c := connector.pool.Get() defer c.Close() - c.Send("MULTI") //nolint + c.Send("MULTI") //nolint addSendSubscriptionRequest(c, *newSubscription, oldSubscription) //nolint _, err := c.Do("EXEC") if err != nil { @@ -156,13 +156,13 @@ func (connector *DbConnector) removeSubscription(subscription *moira.Subscriptio c := connector.pool.Get() defer c.Close() - c.Send("MULTI") //nolint + c.Send("MULTI") //nolint c.Send("SREM", userSubscriptionsKey(subscription.User), subscription.ID) //nolint for _, tag := range subscription.Tags { c.Send("SREM", tagSubscriptionKey(tag), subscription.ID) //nolint } c.Send("SREM", anyTagsSubscriptionsKey, subscription.ID) //nolint - c.Send("DEL", subscriptionKey(subscription.ID)) //nolint + c.Send("DEL", subscriptionKey(subscription.ID)) //nolint if _, err := c.Do("EXEC"); err != nil { return fmt.Errorf("failed to EXEC: %s", err.Error()) } @@ -196,6 +196,10 @@ func (connector *DbConnector) GetTagsSubscriptions(tags []string) ([]*moira.Subs return connector.GetSubscriptions(subscriptionsIDs) } +func (connector *DbConnector) GetSubscriptionsByContact(contactID string) ([]*moira.SubscriptionData, error) { + return nil, fmt.Errorf("not implemented yet") //TODO +} + func (connector *DbConnector) getSubscriptionsIDsByTags(tags []string) ([]string, error) { c := connector.pool.Get() defer c.Close() @@ -243,7 +247,7 @@ func addSendSubscriptionRequest(c redis.Conn, subscription moira.SubscriptionDat } c.Send("SADD", userSubscriptionsKey(subscription.User), subscription.ID) //nolint - c.Send("SET", subscriptionKey(subscription.ID), bytes) //nolint + c.Send("SET", subscriptionKey(subscription.ID), bytes) //nolint return nil } diff --git a/interfaces.go b/interfaces.go index d5a7ea234..3670da12c 100644 --- a/interfaces.go +++ b/interfaces.go @@ -75,6 +75,7 @@ type Database interface { RemoveSubscription(subscriptionID string) error GetUserSubscriptionIDs(userLogin string) ([]string, error) GetTagsSubscriptions(tags []string) ([]*SubscriptionData, error) + GetSubscriptionsByContact(contactID string) ([]*SubscriptionData, error) // ScheduledNotification storing GetNotifications(start, end int64) ([]*ScheduledNotification, int64, error) diff --git a/notifier/notifier.go b/notifier/notifier.go index 6daa4b2e2..8ec0381c8 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -196,6 +196,11 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific case moira.SenderBrokenContactError: log.Errorf("Cannot send to broken contact: %s", e.Error()) + brokenContact := pkg.Contact.ID + if err := disableBrokenContactSubscriptions(brokenContact, notifier.database, log); err != nil { + log.Errorf("Failed to disable subscriptions for broken contact: ", err) + } + default: log.Errorf("Cannot send notification: %s", err.Error()) notifier.resend(&pkg, err.Error()) @@ -203,3 +208,30 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific } } } + +func disableBrokenContactSubscriptions(brokenContact string, database moira.Database, log moira.Logger) error { + subs, err := database.GetSubscriptionsByContact(brokenContact) + if err != nil { + return err + } + + disableSubs := []*moira.SubscriptionData{} + for _, s := range subs { + if len(s.Contacts) == 1 { + if s.Contacts[0] == brokenContact { + s.Enabled = false + disableSubs = append(disableSubs, s) + } + } else { + log.Warningf("Skipped disable subscription with broken contact, "+ + "contacts more than 1, contacts = %v", s.Contacts) + } + } + if len(disableSubs) > 0 { + if err := database.SaveSubscriptions(disableSubs); err != nil { + return err + } + // todo add metric + } + return nil +} From b452589d0a0c895ad4bb6c7a76667b7ace1805f7 Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Fri, 5 Mar 2021 11:31:53 +0400 Subject: [PATCH 2/7] Add disabled subs metric --- metrics/notifier.go | 2 ++ notifier/notifier.go | 13 +++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/metrics/notifier.go b/metrics/notifier.go index dbb1fe9f3..1c32302d7 100644 --- a/metrics/notifier.go +++ b/metrics/notifier.go @@ -3,6 +3,7 @@ package metrics // NotifierMetrics is a collection of metrics used in notifier type NotifierMetrics struct { SubsMalformed Meter + SubsBrokenDisabled Meter EventsReceived Meter EventsMalformed Meter EventsProcessingFailed Meter @@ -16,6 +17,7 @@ type NotifierMetrics struct { func ConfigureNotifierMetrics(registry Registry, prefix string) *NotifierMetrics { return &NotifierMetrics{ SubsMalformed: registry.NewMeter("subs", "malformed"), + SubsBrokenDisabled: registry.NewMeter("subs", "broken_disabled"), EventsReceived: registry.NewMeter("events", "received"), EventsMalformed: registry.NewMeter("events", "malformed"), EventsProcessingFailed: registry.NewMeter("events", "failed"), diff --git a/notifier/notifier.go b/notifier/notifier.go index 8ec0381c8..a168eb0b6 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -197,7 +197,8 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific log.Errorf("Cannot send to broken contact: %s", e.Error()) brokenContact := pkg.Contact.ID - if err := disableBrokenContactSubscriptions(brokenContact, notifier.database, log); err != nil { + if err := disableBrokenContactSubscriptions(brokenContact, notifier.database, log, + ¬ifier.metrics.SubsBrokenDisabled); err != nil { log.Errorf("Failed to disable subscriptions for broken contact: ", err) } @@ -209,7 +210,8 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific } } -func disableBrokenContactSubscriptions(brokenContact string, database moira.Database, log moira.Logger) error { +func disableBrokenContactSubscriptions(brokenContact string, database moira.Database, log moira.Logger, + metric *metrics.Meter) error { subs, err := database.GetSubscriptionsByContact(brokenContact) if err != nil { return err @@ -227,11 +229,14 @@ func disableBrokenContactSubscriptions(brokenContact string, database moira.Data "contacts more than 1, contacts = %v", s.Contacts) } } - if len(disableSubs) > 0 { + disabledCount := len(disableSubs) + if disabledCount > 0 { if err := database.SaveSubscriptions(disableSubs); err != nil { return err } - // todo add metric + if metric != nil { + (*metric).Mark(int64(disabledCount)) + } } return nil } From 8f2183eb88447d3bbddcca6af1bf3650ff009a8a Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Fri, 5 Mar 2021 11:55:33 +0400 Subject: [PATCH 3/7] mockgen & lint fix --- mock/moira-alert/database.go | 17 ++++++++++++++++- notifier/notifier.go | 6 +++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/mock/moira-alert/database.go b/mock/moira-alert/database.go index f03c4deb2..34a631788 100644 --- a/mock/moira-alert/database.go +++ b/mock/moira-alert/database.go @@ -595,7 +595,22 @@ func (mr *MockDatabaseMockRecorder) GetSubscriptions(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockDatabase)(nil).GetSubscriptions), arg0) } -// GetTagNames mocks base method +// GetSubscriptionsByContact mocks base method. +func (m *MockDatabase) GetSubscriptionsByContact(arg0 string) ([]*moira.SubscriptionData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSubscriptionsByContact", arg0) + ret0, _ := ret[0].([]*moira.SubscriptionData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSubscriptionsByContact indicates an expected call of GetSubscriptionsByContact. +func (mr *MockDatabaseMockRecorder) GetSubscriptionsByContact(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptionsByContact", reflect.TypeOf((*MockDatabase)(nil).GetSubscriptionsByContact), arg0) +} + +// GetTagNames mocks base method. func (m *MockDatabase) GetTagNames() ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetTagNames") diff --git a/notifier/notifier.go b/notifier/notifier.go index a168eb0b6..3c9195564 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -197,9 +197,9 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific log.Errorf("Cannot send to broken contact: %s", e.Error()) brokenContact := pkg.Contact.ID - if err := disableBrokenContactSubscriptions(brokenContact, notifier.database, log, - ¬ifier.metrics.SubsBrokenDisabled); err != nil { - log.Errorf("Failed to disable subscriptions for broken contact: ", err) + if errorDisabling := disableBrokenContactSubscriptions(brokenContact, notifier.database, log, + ¬ifier.metrics.SubsBrokenDisabled); errorDisabling != nil { + log.Errorf("Failed to disable subscriptions for broken contact: ", errorDisabling) } default: From a7922d90b6fcac41c4ee05e5a23a0bd3de846a40 Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Fri, 5 Mar 2021 12:05:08 +0400 Subject: [PATCH 4/7] Little bit more logs --- notifier/notifier.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/notifier/notifier.go b/notifier/notifier.go index 3c9195564..a5573ba0b 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -197,9 +197,11 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific log.Errorf("Cannot send to broken contact: %s", e.Error()) brokenContact := pkg.Contact.ID - if errorDisabling := disableBrokenContactSubscriptions(brokenContact, notifier.database, log, - ¬ifier.metrics.SubsBrokenDisabled); errorDisabling != nil { - log.Errorf("Failed to disable subscriptions for broken contact: ", errorDisabling) + if count, errorDisabling := disableBrokenContactSubscriptions(brokenContact, notifier.database, log); errorDisabling != nil { + log.Errorf("Failed to disable broken subscriptions: ", errorDisabling) + } else { + notifier.metrics.SubsBrokenDisabled.Mark(int64(count)) + log.Infof("Disabled %d broken subscriptions", count) } default: @@ -210,11 +212,11 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific } } -func disableBrokenContactSubscriptions(brokenContact string, database moira.Database, log moira.Logger, - metric *metrics.Meter) error { +func disableBrokenContactSubscriptions(brokenContact string, database moira.Database, log moira.Logger) ( + disabledCount int, e error) { subs, err := database.GetSubscriptionsByContact(brokenContact) if err != nil { - return err + return 0, err } disableSubs := []*moira.SubscriptionData{} @@ -229,14 +231,12 @@ func disableBrokenContactSubscriptions(brokenContact string, database moira.Data "contacts more than 1, contacts = %v", s.Contacts) } } - disabledCount := len(disableSubs) - if disabledCount > 0 { + disableCount := len(disableSubs) + if disableCount > 0 { if err := database.SaveSubscriptions(disableSubs); err != nil { - return err - } - if metric != nil { - (*metric).Mark(int64(disabledCount)) + return 0, err } + return disableCount, nil } - return nil + return 0, nil } From 072cc9cf936c380e8a3a7d0818a1bd1d88a4e71f Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Fri, 5 Mar 2021 15:26:22 +0400 Subject: [PATCH 5/7] Fix get subs & revert database interface --- database/redis/subscription.go | 4 ---- interfaces.go | 1 - mock/moira-alert/database.go | 17 +---------------- notifier/notifier.go | 14 ++++++++------ 4 files changed, 9 insertions(+), 27 deletions(-) diff --git a/database/redis/subscription.go b/database/redis/subscription.go index 0330a3171..28284e945 100644 --- a/database/redis/subscription.go +++ b/database/redis/subscription.go @@ -196,10 +196,6 @@ func (connector *DbConnector) GetTagsSubscriptions(tags []string) ([]*moira.Subs return connector.GetSubscriptions(subscriptionsIDs) } -func (connector *DbConnector) GetSubscriptionsByContact(contactID string) ([]*moira.SubscriptionData, error) { - return nil, fmt.Errorf("not implemented yet") //TODO -} - func (connector *DbConnector) getSubscriptionsIDsByTags(tags []string) ([]string, error) { c := connector.pool.Get() defer c.Close() diff --git a/interfaces.go b/interfaces.go index 3670da12c..d5a7ea234 100644 --- a/interfaces.go +++ b/interfaces.go @@ -75,7 +75,6 @@ type Database interface { RemoveSubscription(subscriptionID string) error GetUserSubscriptionIDs(userLogin string) ([]string, error) GetTagsSubscriptions(tags []string) ([]*SubscriptionData, error) - GetSubscriptionsByContact(contactID string) ([]*SubscriptionData, error) // ScheduledNotification storing GetNotifications(start, end int64) ([]*ScheduledNotification, int64, error) diff --git a/mock/moira-alert/database.go b/mock/moira-alert/database.go index 34a631788..f03c4deb2 100644 --- a/mock/moira-alert/database.go +++ b/mock/moira-alert/database.go @@ -595,22 +595,7 @@ func (mr *MockDatabaseMockRecorder) GetSubscriptions(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockDatabase)(nil).GetSubscriptions), arg0) } -// GetSubscriptionsByContact mocks base method. -func (m *MockDatabase) GetSubscriptionsByContact(arg0 string) ([]*moira.SubscriptionData, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetSubscriptionsByContact", arg0) - ret0, _ := ret[0].([]*moira.SubscriptionData) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetSubscriptionsByContact indicates an expected call of GetSubscriptionsByContact. -func (mr *MockDatabaseMockRecorder) GetSubscriptionsByContact(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptionsByContact", reflect.TypeOf((*MockDatabase)(nil).GetSubscriptionsByContact), arg0) -} - -// GetTagNames mocks base method. +// GetTagNames mocks base method func (m *MockDatabase) GetTagNames() ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetTagNames") diff --git a/notifier/notifier.go b/notifier/notifier.go index a5573ba0b..241381216 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -195,9 +195,7 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific switch e := err.(type) { case moira.SenderBrokenContactError: log.Errorf("Cannot send to broken contact: %s", e.Error()) - - brokenContact := pkg.Contact.ID - if count, errorDisabling := disableBrokenContactSubscriptions(brokenContact, notifier.database, log); errorDisabling != nil { + if count, errorDisabling := disableBrokenContactSubscriptions(&pkg.Contact, notifier.database, log); errorDisabling != nil { log.Errorf("Failed to disable broken subscriptions: ", errorDisabling) } else { notifier.metrics.SubsBrokenDisabled.Mark(int64(count)) @@ -212,17 +210,21 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific } } -func disableBrokenContactSubscriptions(brokenContact string, database moira.Database, log moira.Logger) ( +func disableBrokenContactSubscriptions(brokenContact *moira.ContactData, database moira.Database, log moira.Logger) ( disabledCount int, e error) { - subs, err := database.GetSubscriptionsByContact(brokenContact) + subsIDs, err := database.GetUserSubscriptionIDs(brokenContact.User) if err != nil { return 0, err } + subs, e := database.GetSubscriptions(subsIDs) + if e != nil { + return 0, e + } disableSubs := []*moira.SubscriptionData{} for _, s := range subs { if len(s.Contacts) == 1 { - if s.Contacts[0] == brokenContact { + if s.Contacts[0] == brokenContact.ID { s.Enabled = false disableSubs = append(disableSubs, s) } From 0598b16d3c72397e4faaf621ecd4dbfe87bcf831 Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Fri, 5 Mar 2021 15:33:10 +0400 Subject: [PATCH 6/7] Revert redis/subscriptions.go --- database/redis/subscription.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/database/redis/subscription.go b/database/redis/subscription.go index 28284e945..181d454f4 100644 --- a/database/redis/subscription.go +++ b/database/redis/subscription.go @@ -79,7 +79,7 @@ func (connector *DbConnector) updateSubscription(newSubscription *moira.Subscrip c := connector.pool.Get() defer c.Close() - c.Send("MULTI") //nolint + c.Send("MULTI") //nolint addSendSubscriptionRequest(c, *newSubscription, oldSubscription) //nolint _, err := c.Do("EXEC") if err != nil { @@ -156,13 +156,13 @@ func (connector *DbConnector) removeSubscription(subscription *moira.Subscriptio c := connector.pool.Get() defer c.Close() - c.Send("MULTI") //nolint + c.Send("MULTI") //nolint c.Send("SREM", userSubscriptionsKey(subscription.User), subscription.ID) //nolint for _, tag := range subscription.Tags { c.Send("SREM", tagSubscriptionKey(tag), subscription.ID) //nolint } c.Send("SREM", anyTagsSubscriptionsKey, subscription.ID) //nolint - c.Send("DEL", subscriptionKey(subscription.ID)) //nolint + c.Send("DEL", subscriptionKey(subscription.ID)) //nolint if _, err := c.Do("EXEC"); err != nil { return fmt.Errorf("failed to EXEC: %s", err.Error()) } @@ -243,7 +243,7 @@ func addSendSubscriptionRequest(c redis.Conn, subscription moira.SubscriptionDat } c.Send("SADD", userSubscriptionsKey(subscription.User), subscription.ID) //nolint - c.Send("SET", subscriptionKey(subscription.ID), bytes) //nolint + c.Send("SET", subscriptionKey(subscription.ID), bytes) //nolint return nil } From b0ec737973d2aafa909ec5e608c5eb52b2d2c50e Mon Sep 17 00:00:00 2001 From: Andrey Kolkov Date: Fri, 5 Mar 2021 16:35:43 +0400 Subject: [PATCH 7/7] fix tests --- notifier/notifier.go | 4 ++++ notifier/notifier_test.go | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/notifier/notifier.go b/notifier/notifier.go index 241381216..0f0b1c666 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -216,6 +216,10 @@ func disableBrokenContactSubscriptions(brokenContact *moira.ContactData, databas if err != nil { return 0, err } + if len(subsIDs) == 0 { + return 0, nil + } + subs, e := database.GetSubscriptions(subsIDs) if e != nil { return 0, e diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index d2009a655..6b1f0f71c 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -131,10 +131,12 @@ func TestNoResendForSendToBrokenContact(t *testing.T) { Events: eventsData, Contact: moira.ContactData{ Type: "test", + User: "someuser", }, } sender.EXPECT().SendEvents(eventsData, pkg.Contact, pkg.Trigger, plots, pkg.Throttled). Return(moira.NewSenderBrokenContactError(fmt.Errorf("some sender reason"))) + dataBase.EXPECT().GetUserSubscriptionIDs(pkg.Contact.User).Return([]string{}, nil) var wg sync.WaitGroup notif.Send(&pkg, &wg) @@ -236,7 +238,8 @@ func afterTest() { var subID = "SubscriptionID-000000000000001" var event = moira.NotificationEvent{ - Metric: "generate.event.1", + Metric: "generate.event.1", + State: moira.StateOK, OldState: moira.StateWARN, TriggerID: "triggerID-0000000000001",