From 5ef371585c21b5fa23535ac6878d68762c14f5cf Mon Sep 17 00:00:00 2001 From: totegamma Date: Mon, 25 Nov 2024 21:29:26 +0900 Subject: [PATCH 1/6] impl --- cmd/api/config.go | 21 +++--- cmd/api/main.go | 20 +++++- core/dbschema.go | 10 +++ core/interfaces.go | 7 ++ core/mock/services.go | 53 +++++++++++++++ go.mod | 1 + go.sum | 3 + wire.go | 12 ++++ wire_gen.go | 10 +++ x/notification/handler.go | 78 ++++++++++++++++++++++ x/notification/reactor.go | 137 ++++++++++++++++++++++++++++++++++++++ x/notification/repo.go | 73 ++++++++++++++++++++ x/notification/service.go | 65 ++++++++++++++++++ 13 files changed, 480 insertions(+), 10 deletions(-) create mode 100644 x/notification/handler.go create mode 100644 x/notification/reactor.go create mode 100644 x/notification/repo.go create mode 100644 x/notification/service.go diff --git a/cmd/api/config.go b/cmd/api/config.go index 6764e074..30aeb641 100644 --- a/cmd/api/config.go +++ b/cmd/api/config.go @@ -14,15 +14,17 @@ type Config struct { } type Server struct { - Dsn string `yaml:"dsn"` - RedisAddr string `yaml:"redisAddr"` - RedisDB int `yaml:"redisDB"` - MemcachedAddr string `yaml:"memcachedAddr"` - EnableTrace bool `yaml:"enableTrace"` - TraceEndpoint string `yaml:"traceEndpoint"` - RepositoryPath string `yaml:"repositoryPath"` - CaptchaSitekey string `yaml:"captchaSitekey"` - CaptchaSecret string `yaml:"captchaSecret"` + Dsn string `yaml:"dsn"` + RedisAddr string `yaml:"redisAddr"` + RedisDB int `yaml:"redisDB"` + MemcachedAddr string `yaml:"memcachedAddr"` + EnableTrace bool `yaml:"enableTrace"` + TraceEndpoint string `yaml:"traceEndpoint"` + RepositoryPath string `yaml:"repositoryPath"` + CaptchaSitekey string `yaml:"captchaSitekey"` + CaptchaSecret string `yaml:"captchaSecret"` + VapidPublicKey string `yaml:"vapidPublicKey"` + VapidPrivateKey string `yaml:"vapidPrivateKey"` } type BuildInfo struct { @@ -45,6 +47,7 @@ type Profile struct { Version string `yaml:"version" json:"version"` BuildInfo BuildInfo `yaml:"buildInfo" json:"buildInfo"` SiteKey string `yaml:"captchaSiteKey" json:"captchaSiteKey"` + VapidKey string `yaml:"vapidKey" json:"vapidKey"` } // Load loads config from given path diff --git a/cmd/api/main.go b/cmd/api/main.go index 389c2ac1..d46f3744 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -30,14 +30,15 @@ import ( "github.com/totegamma/concurrent/x/job" "github.com/totegamma/concurrent/x/key" "github.com/totegamma/concurrent/x/message" + "github.com/totegamma/concurrent/x/notification" "github.com/totegamma/concurrent/x/profile" "github.com/totegamma/concurrent/x/store" "github.com/totegamma/concurrent/x/subscription" "github.com/totegamma/concurrent/x/timeline" "github.com/totegamma/concurrent/x/userkv" + "github.com/SherClockHolmes/webpush-go" "github.com/bradfitz/gomemcache/memcache" - "github.com/redis/go-redis/extra/redisotel/v9" "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" "go.opentelemetry.io/otel" @@ -186,6 +187,7 @@ func main() { &core.Job{}, &core.CommitLog{}, &core.CommitOwner{}, + &core.NotificationSubscription{}, ) if err != nil { @@ -264,6 +266,15 @@ func main() { jobHandler := job.NewHandler(jobService) jobReactor := job.NewReactor(storeService, jobService) + webpushOpts := webpush.Options{ + VAPIDPublicKey: config.Server.VapidPublicKey, + VAPIDPrivateKey: config.Server.VapidPrivateKey, + } + + notificationService := concurrent.SetupNotificationService(db) + notificationHandler := notification.NewHandler(notificationService) + notificationReactor := notification.NewReactor(notificationService, timelineService, webpushOpts) + // migration from 1.3.2 to 1.3.3 var remotes []core.Domain db.Find(&remotes) @@ -295,6 +306,7 @@ func main() { GoVersion: goVersion, } meta.SiteKey = config.Server.CaptchaSitekey + meta.VapidKey = config.Server.VapidPublicKey return c.JSON(http.StatusOK, echo.Map{"status": "ok", "content": core.Domain{ ID: conconf.FQDN, @@ -372,6 +384,11 @@ func main() { apiV1.POST("/jobs", jobHandler.Create, auth.Restrict(auth.ISREGISTERED)) apiV1.DELETE("/job/:id", jobHandler.Cancel, auth.Restrict(auth.ISREGISTERED)) + // notification + apiV1.POST("/notification", notificationHandler.Subscribe, auth.Restrict(auth.ISREGISTERED)) + apiV1.DELETE("/notification/:owner/:vendor_id", notificationHandler.Delete, auth.Restrict(auth.ISREGISTERED)) + apiV1.GET("/notification/:owner/:vendor_id", notificationHandler.Get, auth.Restrict(auth.ISREGISTERED)) + // misc e.GET("/health", func(c echo.Context) (err error) { ctx := c.Request().Context() @@ -464,6 +481,7 @@ func main() { timelineKeeper.Start(context.Background()) jobReactor.Start(context.Background()) + notificationReactor.Start(context.Background()) port := ":8000" envport := os.Getenv("CC_API_PORT") diff --git a/core/dbschema.go b/core/dbschema.go index 8c0680b5..6a642f35 100644 --- a/core/dbschema.go +++ b/core/dbschema.go @@ -237,3 +237,13 @@ type CommitLog struct { Owners []string `json:"owners" gorm:"-"` CDate time.Time `json:"cdate" gorm:"type:timestamp with time zone;not null;default:clock_timestamp()"` } + +type NotificationSubscription struct { + VendorID string `json:"vendorID" gorm:"primaryKey;type:text"` + Owner string `json:"owner" gorm:"primaryKey;type:text"` + Schemas pq.StringArray `json:"schemas" gorm:"type:text[]"` + Timelines pq.StringArray `json:"timelines" gorm:"type:text[]"` + Subscription string `json:"subscription" gorm:"type:text"` + CDate time.Time `json:"cdate" gorm:"type:timestamp with time zone;not null;default:clock_timestamp()"` + MDate time.Time `json:"mdate" gorm:"autoUpdateTime"` +} diff --git a/core/interfaces.go b/core/interfaces.go index a27b848b..7161086f 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -200,3 +200,10 @@ type JobService interface { Complete(ctx context.Context, id, status, result string) (Job, error) Cancel(ctx context.Context, id string) (Job, error) } + +type NotificationService interface { + Subscribe(ctx context.Context, notification NotificationSubscription) (NotificationSubscription, error) + GetAllSubscriptions(ctx context.Context) ([]NotificationSubscription, error) + Delete(ctx context.Context, vendorID, owner string) error + Get(ctx context.Context, vendorID, owner string) (NotificationSubscription, error) +} diff --git a/core/mock/services.go b/core/mock/services.go index e938c9ed..ae91b594 100644 --- a/core/mock/services.go +++ b/core/mock/services.go @@ -2251,3 +2251,56 @@ func (mr *MockJobServiceMockRecorder) List(ctx, requester any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockJobService)(nil).List), ctx, requester) } + +// MockNotificationService is a mock of NotificationService interface. +type MockNotificationService struct { + ctrl *gomock.Controller + recorder *MockNotificationServiceMockRecorder +} + +// MockNotificationServiceMockRecorder is the mock recorder for MockNotificationService. +type MockNotificationServiceMockRecorder struct { + mock *MockNotificationService +} + +// NewMockNotificationService creates a new mock instance. +func NewMockNotificationService(ctrl *gomock.Controller) *MockNotificationService { + mock := &MockNotificationService{ctrl: ctrl} + mock.recorder = &MockNotificationServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockNotificationService) EXPECT() *MockNotificationServiceMockRecorder { + return m.recorder +} + +// GetAllSubscriptions mocks base method. +func (m *MockNotificationService) GetAllSubscriptions(ctx context.Context) ([]core.NotificationSubscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllSubscriptions", ctx) + ret0, _ := ret[0].([]core.NotificationSubscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllSubscriptions indicates an expected call of GetAllSubscriptions. +func (mr *MockNotificationServiceMockRecorder) GetAllSubscriptions(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllSubscriptions", reflect.TypeOf((*MockNotificationService)(nil).GetAllSubscriptions), ctx) +} + +// Subscribe mocks base method. +func (m *MockNotificationService) Subscribe(ctx context.Context, notification core.NotificationSubscription) (core.NotificationSubscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscribe", ctx, notification) + ret0, _ := ret[0].(core.NotificationSubscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockNotificationServiceMockRecorder) Subscribe(ctx, notification any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockNotificationService)(nil).Subscribe), ctx, notification) +} diff --git a/go.mod b/go.mod index fe284ff9..7502f8b6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/totegamma/concurrent go 1.22.4 require ( + github.com/SherClockHolmes/webpush-go v1.3.0 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 github.com/cosmos/cosmos-sdk v0.50.7 github.com/ethereum/go-ethereum v1.14.5 diff --git a/go.sum b/go.sum index ceeeecee..f63f2b89 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEV github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/SherClockHolmes/webpush-go v1.3.0 h1:CAu3FvEE9QS4drc3iKNgpBWFfGqNthKlZhp5QpYnu6k= +github.com/SherClockHolmes/webpush-go v1.3.0/go.mod h1:AxRHmJuYwKGG1PVgYzToik1lphQvDnqFYDqimHvwhIw= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -574,6 +576,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= diff --git a/wire.go b/wire.go index 8a018c92..75efa7e3 100644 --- a/wire.go +++ b/wire.go @@ -20,6 +20,7 @@ import ( "github.com/totegamma/concurrent/x/jwt" "github.com/totegamma/concurrent/x/key" "github.com/totegamma/concurrent/x/message" + "github.com/totegamma/concurrent/x/notification" "github.com/totegamma/concurrent/x/policy" "github.com/totegamma/concurrent/x/profile" "github.com/totegamma/concurrent/x/schema" @@ -73,6 +74,12 @@ var storeServiceProvider = wire.NewSet( SetupSemanticidService, ) +// other +var notificationServiceProvider = wire.NewSet( + notification.NewService, + notification.NewRepository, +) + // ----------- func SetupPolicyService(rdb *redis.Client, globalPolicy core.Policy, config core.Config) core.PolicyService { @@ -159,3 +166,8 @@ func SetupSemanticidService(db *gorm.DB) core.SemanticIDService { wire.Build(semanticidServiceProvider) return nil } + +func SetupNotificationService(db *gorm.DB) core.NotificationService { + wire.Build(notificationServiceProvider) + return nil +} diff --git a/wire_gen.go b/wire_gen.go index 80c9e212..c5a7e421 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -21,6 +21,7 @@ import ( "github.com/totegamma/concurrent/x/jwt" "github.com/totegamma/concurrent/x/key" "github.com/totegamma/concurrent/x/message" + "github.com/totegamma/concurrent/x/notification" "github.com/totegamma/concurrent/x/policy" "github.com/totegamma/concurrent/x/profile" "github.com/totegamma/concurrent/x/schema" @@ -175,6 +176,12 @@ func SetupSemanticidService(db *gorm.DB) core.SemanticIDService { return semanticIDService } +func SetupNotificationService(db *gorm.DB) core.NotificationService { + repo := notification.NewRepository(db) + notificationService := notification.NewService(repo) + return notificationService +} + // wire.go: // Lv0 @@ -226,3 +233,6 @@ var storeServiceProvider = wire.NewSet(store.NewService, store.NewRepository, Se SetupSubscriptionService, SetupSemanticidService, ) + +// other +var notificationServiceProvider = wire.NewSet(notification.NewService, notification.NewRepository) diff --git a/x/notification/handler.go b/x/notification/handler.go new file mode 100644 index 00000000..31701f5f --- /dev/null +++ b/x/notification/handler.go @@ -0,0 +1,78 @@ +package notification + +import ( + "net/http" + + "github.com/labstack/echo/v4" + "go.opentelemetry.io/otel" + + "github.com/totegamma/concurrent/core" +) + +var tracer = otel.Tracer("notification") + +type Handler interface { + Subscribe(c echo.Context) error + Delete(c echo.Context) error + Get(c echo.Context) error +} + +type handler struct { + service core.NotificationService +} + +func NewHandler(service core.NotificationService) Handler { + return &handler{service: service} +} + +func (h *handler) Subscribe(c echo.Context) error { + ctx, span := tracer.Start(c.Request().Context(), "Notification.Handler.Subscribe") + defer span.End() + + var subscription core.NotificationSubscription + err := c.Bind(&subscription) + if err != nil { + span.RecordError(err) + return c.JSON(http.StatusBadRequest, echo.Map{"error": err.Error()}) + } + + subscription, err = h.service.Subscribe(ctx, subscription) + if err != nil { + span.RecordError(err) + return c.JSON(http.StatusInternalServerError, echo.Map{"error": err.Error()}) + } + + return c.JSON(http.StatusCreated, echo.Map{"content": subscription}) +} + +func (h *handler) Delete(c echo.Context) error { + ctx, span := tracer.Start(c.Request().Context(), "Notification.Handler.Delete") + defer span.End() + + owner := c.Param("owner") + vendorID := c.Param("vendor_id") + + err := h.service.Delete(ctx, vendorID, owner) + if err != nil { + span.RecordError(err) + return c.JSON(http.StatusInternalServerError, echo.Map{"error": err.Error()}) + } + + return c.NoContent(http.StatusNoContent) +} + +func (h *handler) Get(c echo.Context) error { + ctx, span := tracer.Start(c.Request().Context(), "Notification.Handler.Get") + defer span.End() + + owner := c.Param("owner") + vendorID := c.Param("vendor_id") + + subscription, err := h.service.Get(ctx, vendorID, owner) + if err != nil { + span.RecordError(err) + return c.JSON(http.StatusInternalServerError, echo.Map{"error": err.Error()}) + } + + return c.JSON(http.StatusOK, echo.Map{"content": subscription}) +} diff --git a/x/notification/reactor.go b/x/notification/reactor.go new file mode 100644 index 00000000..1ea90020 --- /dev/null +++ b/x/notification/reactor.go @@ -0,0 +1,137 @@ +package notification + +import ( + "context" + "encoding/json" + "log/slog" + "slices" + "time" + + "github.com/SherClockHolmes/webpush-go" + + "github.com/totegamma/concurrent/core" +) + +type reactor struct { + service core.NotificationService + timeline core.TimelineService + opts webpush.Options +} + +func NewReactor(service core.NotificationService, timeline core.TimelineService, opts webpush.Options) Reactor { + return &reactor{ + service: service, + timeline: timeline, + opts: opts, + } +} + +type Reactor interface { + Start(ctx context.Context) +} + +type Worker struct { + MDate time.Time + Routine context.CancelFunc +} + +func (r *reactor) Start(ctx context.Context) { + slog.Info("starting reactor") + + ticker10 := time.NewTicker(10 * time.Second) + workers := make(map[string]Worker) + + go func() { + for ; true; <-ticker10.C { + slog.Info("checking subscriptions") + + subscriptions, err := r.service.GetAllSubscriptions(ctx) + if err != nil { + slog.Error("error getting subscriptions", slog.String("error", err.Error())) + continue + } + + for _, sub := range subscriptions { + + subID := sub.VendorID + sub.Owner + existingWorker, ok := workers[subID] + if ok { + if existingWorker.MDate == sub.MDate { + slog.Info("worker already running", slog.String("vendorID", sub.VendorID), slog.String("owner", sub.Owner)) + continue + } else { + existingWorker.Routine() + delete(workers, subID) + } + } + + slog.Info("starting worker", slog.String("vendorID", sub.VendorID), slog.String("owner", sub.Owner)) + + workerctx, cancel := context.WithCancel(ctx) + workers[subID] = Worker{ + MDate: sub.MDate, + Routine: cancel, + } + + go func(ctx context.Context, sub core.NotificationSubscription) { + + slog.Info("worker started", slog.String("vendorID", sub.VendorID), slog.String("owner", sub.Owner)) + + var subscription webpush.Subscription + json.Unmarshal([]byte(sub.Subscription), &subscription) + + request := make(chan []string) + realtime := make(chan core.Event) + + go r.timeline.Realtime(ctx, request, realtime) + + request <- sub.Timelines + + for { + select { + case <-ctx.Done(): + close(request) + close(realtime) + return + case event := <-realtime: + + var doc core.DocumentBase[any] + err := json.Unmarshal([]byte(event.Document), &doc) + if err != nil { + slog.Error("error unmarshalling document", slog.String("error", err.Error())) + continue + } + + if !slices.Contains(sub.Schemas, doc.Schema) { + slog.Info("schema not in subscription", slog.String("schema", doc.Schema)) + continue + } + + // Send Notification + resp, err := webpush.SendNotification([]byte(event.Document), &subscription, &r.opts) + if err != nil { + slog.Error("error sending notification", slog.String("error", err.Error())) + continue + } + defer resp.Body.Close() + } + } + }(workerctx, sub) + + } + + var validSubs []string + for _, sub := range subscriptions { + validSubs = append(validSubs, sub.VendorID+sub.Owner) + } + + for id, worker := range workers { + if !slices.Contains(validSubs, id) { + slog.Info("stopping worker", slog.String("id", id)) + worker.Routine() + delete(workers, id) + } + } + } + }() +} diff --git a/x/notification/repo.go b/x/notification/repo.go new file mode 100644 index 00000000..3014abc0 --- /dev/null +++ b/x/notification/repo.go @@ -0,0 +1,73 @@ +package notification + +import ( + "context" + + "gorm.io/gorm" + + "github.com/totegamma/concurrent/core" +) + +type Repo interface { + Get(ctx context.Context, vendorID, owner string) (core.NotificationSubscription, error) + Subscribe(ctx context.Context, notification core.NotificationSubscription) (core.NotificationSubscription, error) + GetAllSubscriptions(ctx context.Context) ([]core.NotificationSubscription, error) + Delete(ctx context.Context, vendorID, owner string) error +} + +type repository struct { + db *gorm.DB +} + +func NewRepository(db *gorm.DB) Repo { + return &repository{db} +} + +func (r *repository) Subscribe(ctx context.Context, notification core.NotificationSubscription) (core.NotificationSubscription, error) { + ctx, span := tracer.Start(ctx, "Notification.Repository.Subscribe") + defer span.End() + + if err := r.db.WithContext(ctx).Save(¬ification).Error; err != nil { + return core.NotificationSubscription{}, err + } + + return notification, nil +} + +func (r *repository) GetAllSubscriptions(ctx context.Context) ([]core.NotificationSubscription, error) { + ctx, span := tracer.Start(ctx, "Notification.Repository.GetAllSubscriptions") + defer span.End() + + var notifications []core.NotificationSubscription + err := r.db.Find(¬ifications).Error + if err != nil { + return nil, err + } + + return notifications, nil +} + +func (r *repository) Delete(ctx context.Context, vendorID, owner string) error { + ctx, span := tracer.Start(ctx, "Notification.Repository.Delete") + defer span.End() + + err := r.db.WithContext(ctx).Where("vendor_id = ? AND owner = ?", vendorID, owner).Delete(&core.NotificationSubscription{}).Error + if err != nil { + return err + } + + return nil +} + +func (r *repository) Get(ctx context.Context, vendorID, owner string) (core.NotificationSubscription, error) { + ctx, span := tracer.Start(ctx, "Notification.Repository.Get") + defer span.End() + + var notification core.NotificationSubscription + err := r.db.WithContext(ctx).Where("vendor_id = ? AND owner = ?", vendorID, owner).First(¬ification).Error + if err != nil { + return core.NotificationSubscription{}, err + } + + return notification, nil +} diff --git a/x/notification/service.go b/x/notification/service.go new file mode 100644 index 00000000..79104777 --- /dev/null +++ b/x/notification/service.go @@ -0,0 +1,65 @@ +package notification + +import ( + "context" + + "github.com/totegamma/concurrent/core" +) + +type service struct { + repo Repo +} + +func NewService(repo Repo) core.NotificationService { + return &service{ + repo, + } +} + +func (s *service) Subscribe(ctx context.Context, subscription core.NotificationSubscription) (core.NotificationSubscription, error) { + ctx, span := tracer.Start(ctx, "Notification.Service.Subscribe") + defer span.End() + + subscription, err := s.repo.Subscribe(ctx, subscription) + if err != nil { + return core.NotificationSubscription{}, err + } + + return subscription, nil +} + +func (s *service) GetAllSubscriptions(ctx context.Context) ([]core.NotificationSubscription, error) { + ctx, span := tracer.Start(ctx, "Notification.Service.GetAllSubscriptions") + defer span.End() + + subscriptions, err := s.repo.GetAllSubscriptions(ctx) + if err != nil { + return nil, err + } + + return subscriptions, nil +} + +func (s *service) Delete(ctx context.Context, vendorID, owner string) error { + ctx, span := tracer.Start(ctx, "Notification.Service.Delete") + defer span.End() + + err := s.repo.Delete(ctx, vendorID, owner) + if err != nil { + return err + } + + return nil +} + +func (s *service) Get(ctx context.Context, vendorID, owner string) (core.NotificationSubscription, error) { + ctx, span := tracer.Start(ctx, "Notification.Service.Get") + defer span.End() + + subscription, err := s.repo.Get(ctx, vendorID, owner) + if err != nil { + return core.NotificationSubscription{}, err + } + + return subscription, nil +} From 9913392720007a6b97c379e0285193803e8fddf7 Mon Sep 17 00:00:00 2001 From: totegamma Date: Mon, 25 Nov 2024 22:21:21 +0900 Subject: [PATCH 2/6] add more debug code --- x/notification/reactor.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/x/notification/reactor.go b/x/notification/reactor.go index 1ea90020..e3f55557 100644 --- a/x/notification/reactor.go +++ b/x/notification/reactor.go @@ -3,6 +3,7 @@ package notification import ( "context" "encoding/json" + "io" "log/slog" "slices" "time" @@ -114,6 +115,20 @@ func (r *reactor) Start(ctx context.Context) { continue } defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("error reading response body", slog.String("error", err.Error())) + continue + } + + slog.Info("notification sent", + slog.String("vendorID", sub.VendorID), + slog.String("owner", sub.Owner), + slog.String("schema", doc.Schema), + slog.String("status", resp.Status), + slog.String("body", string(body)), + ) } } }(workerctx, sub) From ac0512639e7df3d24aced2c02fa85a71b59e9996 Mon Sep 17 00:00:00 2001 From: totegamma Date: Mon, 25 Nov 2024 22:50:57 +0900 Subject: [PATCH 3/6] update vapid settings --- cmd/api/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/api/main.go b/cmd/api/main.go index d46f3744..2d80043d 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -267,8 +267,10 @@ func main() { jobReactor := job.NewReactor(storeService, jobService) webpushOpts := webpush.Options{ + Subscriber: config.Concrnt.FQDN, VAPIDPublicKey: config.Server.VapidPublicKey, VAPIDPrivateKey: config.Server.VapidPrivateKey, + TTL: 60, } notificationService := concurrent.SetupNotificationService(db) From 5bb03bb024db0814bce1a6463237c19654cba6e8 Mon Sep 17 00:00:00 2001 From: totegamma Date: Mon, 25 Nov 2024 23:10:01 +0900 Subject: [PATCH 4/6] update vapid subscriber --- cmd/api/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index 2d80043d..ce1e2e89 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -267,7 +267,7 @@ func main() { jobReactor := job.NewReactor(storeService, jobService) webpushOpts := webpush.Options{ - Subscriber: config.Concrnt.FQDN, + Subscriber: "mailto:webmaster@" + config.Concrnt.FQDN, VAPIDPublicKey: config.Server.VapidPublicKey, VAPIDPrivateKey: config.Server.VapidPrivateKey, TTL: 60, From 215d7bc00e431e70aa8854f614cd789042adc0ed Mon Sep 17 00:00:00 2001 From: totegamma Date: Mon, 25 Nov 2024 23:19:46 +0900 Subject: [PATCH 5/6] fix vapid sub --- cmd/api/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index ce1e2e89..c6f92689 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -267,7 +267,7 @@ func main() { jobReactor := job.NewReactor(storeService, jobService) webpushOpts := webpush.Options{ - Subscriber: "mailto:webmaster@" + config.Concrnt.FQDN, + Subscriber: "webmaster@" + config.Concrnt.FQDN, VAPIDPublicKey: config.Server.VapidPublicKey, VAPIDPrivateKey: config.Server.VapidPrivateKey, TTL: 60, From 592d6da11145c85ab85d22dbdeed144468e16834 Mon Sep 17 00:00:00 2001 From: totegamma Date: Mon, 25 Nov 2024 23:59:41 +0900 Subject: [PATCH 6/6] remove debug log --- x/notification/reactor.go | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/x/notification/reactor.go b/x/notification/reactor.go index e3f55557..5fc5ea22 100644 --- a/x/notification/reactor.go +++ b/x/notification/reactor.go @@ -37,14 +37,12 @@ type Worker struct { } func (r *reactor) Start(ctx context.Context) { - slog.Info("starting reactor") ticker10 := time.NewTicker(10 * time.Second) workers := make(map[string]Worker) go func() { for ; true; <-ticker10.C { - slog.Info("checking subscriptions") subscriptions, err := r.service.GetAllSubscriptions(ctx) if err != nil { @@ -58,7 +56,6 @@ func (r *reactor) Start(ctx context.Context) { existingWorker, ok := workers[subID] if ok { if existingWorker.MDate == sub.MDate { - slog.Info("worker already running", slog.String("vendorID", sub.VendorID), slog.String("owner", sub.Owner)) continue } else { existingWorker.Routine() @@ -66,8 +63,6 @@ func (r *reactor) Start(ctx context.Context) { } } - slog.Info("starting worker", slog.String("vendorID", sub.VendorID), slog.String("owner", sub.Owner)) - workerctx, cancel := context.WithCancel(ctx) workers[subID] = Worker{ MDate: sub.MDate, @@ -104,7 +99,6 @@ func (r *reactor) Start(ctx context.Context) { } if !slices.Contains(sub.Schemas, doc.Schema) { - slog.Info("schema not in subscription", slog.String("schema", doc.Schema)) continue } @@ -116,19 +110,21 @@ func (r *reactor) Start(ctx context.Context) { } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - slog.Error("error reading response body", slog.String("error", err.Error())) - continue + if resp.StatusCode != 201 { + body, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("error reading response body", slog.String("error", err.Error())) + continue + } + + slog.Error("notification failed", + slog.String("vendorID", sub.VendorID), + slog.String("owner", sub.Owner), + slog.String("schema", doc.Schema), + slog.String("status", resp.Status), + slog.String("body", string(body)), + ) } - - slog.Info("notification sent", - slog.String("vendorID", sub.VendorID), - slog.String("owner", sub.Owner), - slog.String("schema", doc.Schema), - slog.String("status", resp.Status), - slog.String("body", string(body)), - ) } } }(workerctx, sub)