From 530bdbbe25c29dab26a7c127fb0190db783a8c23 Mon Sep 17 00:00:00 2001 From: totegamma Date: Fri, 2 Feb 2024 18:59:46 +0900 Subject: [PATCH] update improve obervability --- cmd/api/main.go | 82 +++++++++++++++++++++++++++++++------ cmd/api/wire.go | 40 +++++++----------- cmd/gateway/main.go | 27 +++++++----- x/association/handler.go | 23 ++--------- x/association/repository.go | 45 +++++++++++++++++++- x/association/service.go | 35 ++++++++++++---- x/character/repository.go | 60 +++++++++++++++++++++++++-- x/character/service.go | 9 ++++ x/entity/repository.go | 60 +++++++++++++++++++++------ x/entity/service.go | 10 ++--- x/message/repository.go | 48 ++++++++++++++++++---- x/message/service.go | 10 ++--- x/stream/mock/service.go | 15 +++++++ x/stream/repository.go | 38 +++++++++++++++++ x/stream/service.go | 9 ++++ 15 files changed, 401 insertions(+), 110 deletions(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index 6a8cac5e..71e22a2c 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,4 +1,3 @@ -//go:generate go run github.com/google/wire/cmd/wire gen . package main import ( @@ -20,8 +19,13 @@ import ( "github.com/labstack/echo/v4/middleware" "github.com/prometheus/client_golang/prometheus" + "github.com/totegamma/concurrent/x/association" "github.com/totegamma/concurrent/x/auth" + "github.com/totegamma/concurrent/x/character" "github.com/totegamma/concurrent/x/core" + "github.com/totegamma/concurrent/x/entity" + "github.com/totegamma/concurrent/x/message" + "github.com/totegamma/concurrent/x/stream" "github.com/totegamma/concurrent/x/util" "github.com/bradfitz/gomemcache/memcache" @@ -177,21 +181,31 @@ func main() { } defer mc.Close() - agent := SetupAgent(db, rdb, config) + agent := SetupAgent(db, rdb, mc, config) socketManager := SetupSocketManager(mc, db, rdb, config) socketHandler := SetupSocketHandler(rdb, socketManager, config) - messageHandler := SetupMessageHandler(db, rdb, mc, socketManager, config) - characterHandler := SetupCharacterHandler(db, config) - associationHandler := SetupAssociationHandler(db, rdb, mc, socketManager, config) - streamHandler := SetupStreamHandler(db, rdb, mc, socketManager, config) domainHandler := SetupDomainHandler(db, config) - entityHandler := SetupEntityHandler(db, rdb, config) - authHandler := SetupAuthHandler(db, rdb, config) - userkvHandler := SetupUserkvHandler(db, rdb, config) + userkvHandler := SetupUserkvHandler(db, rdb, mc, config) collectionHandler := SetupCollectionHandler(db, rdb, config) - authService := SetupAuthService(db, rdb, config) + messageService := SetupMessageService(db, rdb, mc, socketManager, config) + messageHandler := message.NewHandler(messageService) + + associationService := SetupAssociationService(db, rdb, mc, socketManager, config) + associationHandler := association.NewHandler(associationService) + + characterService := SetupCharacterService(db, mc, config) + characterHandler := character.NewHandler(characterService) + + streamService := SetupStreamService(db, rdb, mc, socketManager, config) + streamHandler := stream.NewHandler(streamService) + + entityService := SetupEntityService(db, rdb, mc, config) + entityHandler := entity.NewHandler(entityService, config) + + authService := SetupAuthService(db, rdb, mc, config) + authHandler := auth.NewHandler(authService) apiV1 := e.Group("", auth.ParseJWT) // domain @@ -313,12 +327,21 @@ func main() { ) prometheus.MustRegister(streamSubscriptionMetrics) - var streamService = SetupStreamService(db, rdb, mc, socketManager, config) + var resourceCountMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cc_resources_count", + Help: "resources count", + }, + []string{"type"}, + ) + prometheus.MustRegister(resourceCountMetrics) go func() { for { time.Sleep(15 * time.Second) - subscriptions, err := streamService.ListStreamSubscriptions(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + subscriptions, err := streamService.ListStreamSubscriptions(ctx) if err != nil { slog.Error(fmt.Sprintf("failed to list stream subscriptions: %v", err)) continue @@ -326,6 +349,41 @@ func main() { for stream, count := range subscriptions { streamSubscriptionMetrics.WithLabelValues(stream).Set(float64(count)) } + + count, err := messageService.Count(ctx) + if err != nil { + slog.Error(fmt.Sprintf("failed to count messages: %v", err)) + continue + } + resourceCountMetrics.WithLabelValues("message").Set(float64(count)) + + count, err = entityService.Count(ctx) + if err != nil { + slog.Error(fmt.Sprintf("failed to count entities: %v", err)) + continue + } + resourceCountMetrics.WithLabelValues("entity").Set(float64(count)) + + count, err = characterService.Count(ctx) + if err != nil { + slog.Error(fmt.Sprintf("failed to count characters: %v", err)) + continue + } + resourceCountMetrics.WithLabelValues("character").Set(float64(count)) + + count, err = associationService.Count(ctx) + if err != nil { + slog.Error(fmt.Sprintf("failed to count associations: %v", err)) + continue + } + resourceCountMetrics.WithLabelValues("association").Set(float64(count)) + + count, err = streamService.Count(ctx) + if err != nil { + slog.Error(fmt.Sprintf("failed to count streams: %v", err)) + continue + } + resourceCountMetrics.WithLabelValues("stream").Set(float64(count)) } }() diff --git a/cmd/api/wire.go b/cmd/api/wire.go index d6de8978..051d6df5 100644 --- a/cmd/api/wire.go +++ b/cmd/api/wire.go @@ -24,35 +24,28 @@ import ( ) var domainHandlerProvider = wire.NewSet(domain.NewHandler, domain.NewService, domain.NewRepository) -var entityHandlerProvider = wire.NewSet(entity.NewHandler, entity.NewService, entity.NewRepository) -var streamHandlerProvider = wire.NewSet(stream.NewHandler, stream.NewService, stream.NewRepository, entity.NewService, entity.NewRepository) -var messageHandlerProvider = wire.NewSet(message.NewHandler, message.NewService, message.NewRepository) -var characterHandlerProvider = wire.NewSet(character.NewHandler, character.NewService, character.NewRepository) -var associationHandlerProvider = wire.NewSet(association.NewHandler, association.NewService, association.NewRepository, message.NewService, message.NewRepository) var userkvHandlerProvider = wire.NewSet(userkv.NewHandler, userkv.NewService, userkv.NewRepository) var collectionHandlerProvider = wire.NewSet(collection.NewHandler, collection.NewService, collection.NewRepository) var jwtServiceProvider = wire.NewSet(jwt.NewService, jwt.NewRepository) var entityServiceProvider = wire.NewSet(entity.NewService, entity.NewRepository, jwtServiceProvider) var streamServiceProvider = wire.NewSet(stream.NewService, stream.NewRepository, entityServiceProvider) +var messageServiceProvider = wire.NewSet(message.NewService, message.NewRepository, streamServiceProvider) +var associationServiceProvider = wire.NewSet(association.NewService, association.NewRepository, messageServiceProvider) +var characterServiceProvider = wire.NewSet(character.NewService, character.NewRepository) -func SetupMessageHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) message.Handler { - wire.Build(messageHandlerProvider, jwtServiceProvider, stream.NewService, stream.NewRepository, entity.NewService, entity.NewRepository) +func SetupMessageService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) message.Service { + wire.Build(messageServiceProvider) return nil } -func SetupCharacterHandler(db *gorm.DB, config util.Config) character.Handler { - wire.Build(characterHandlerProvider) +func SetupCharacterService(db *gorm.DB, mc *memcache.Client, config util.Config) character.Service { + wire.Build(characterServiceProvider) return nil } -func SetupAssociationHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) association.Handler { - wire.Build(associationHandlerProvider, jwtServiceProvider, stream.NewService, stream.NewRepository, entity.NewService, entity.NewRepository) - return nil -} - -func SetupStreamHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) stream.Handler { - wire.Build(streamHandlerProvider, jwtServiceProvider) +func SetupAssociationService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) association.Service { + wire.Build(associationServiceProvider) return nil } @@ -66,8 +59,8 @@ func SetupDomainHandler(db *gorm.DB, config util.Config) domain.Handler { return nil } -func SetupEntityHandler(db *gorm.DB, rdb *redis.Client, config util.Config) entity.Handler { - wire.Build(entityHandlerProvider, jwtServiceProvider) +func SetupEntityService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) entity.Service { + wire.Build(entityServiceProvider) return nil } @@ -76,22 +69,17 @@ func SetupSocketHandler(rdb *redis.Client, manager socket.Manager, config util.C return nil } -func SetupAgent(db *gorm.DB, rdb *redis.Client, config util.Config) agent.Agent { +func SetupAgent(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) agent.Agent { wire.Build(agent.NewAgent, jwtServiceProvider, domain.NewService, domain.NewRepository, entity.NewService, entity.NewRepository) return nil } -func SetupAuthHandler(db *gorm.DB, rdb *redis.Client, config util.Config) auth.Handler { - wire.Build(jwtServiceProvider, auth.NewHandler, auth.NewService, auth.NewRepository, entity.NewService, entity.NewRepository, domain.NewService, domain.NewRepository) - return nil -} - -func SetupAuthService(db *gorm.DB, rdb *redis.Client, config util.Config) auth.Service { +func SetupAuthService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) auth.Service { wire.Build(jwtServiceProvider, auth.NewService, auth.NewRepository, entity.NewService, entity.NewRepository, domain.NewService, domain.NewRepository) return nil } -func SetupUserkvHandler(db *gorm.DB, rdb *redis.Client, config util.Config) userkv.Handler { +func SetupUserkvHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) userkv.Handler { wire.Build(userkvHandlerProvider, jwtServiceProvider, entity.NewService, entity.NewRepository) return nil } diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index befe367f..85ff547a 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -9,7 +9,6 @@ import ( "net/http/httputil" "net/url" "os" - "path/filepath" "strconv" "strings" @@ -75,14 +74,6 @@ func main() { log.Print("Config loaded! I am: ", config.Concurrent.CCID) // Echoの設定 - logfile, err := os.OpenFile(filepath.Join(config.Server.LogPath, "gateway-access.log"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - log.Fatal(err) - } - defer logfile.Close() - - e.Logger.SetOutput(logfile) - e.HidePort = true e.HideBanner = true @@ -127,9 +118,24 @@ func main() { }, })) - e.Use(echoprometheus.NewMiddleware("ccgateway")) e.Use(auth.ParseJWT) + e.Use(echoprometheus.NewMiddlewareWithConfig(echoprometheus.MiddlewareConfig{ + Namespace: "ccgateway", + LabelFuncs: map[string]echoprometheus.LabelValueFunc{ + "service": func(c echo.Context, err error) string { + service := c.Response().Header().Get("cc-service") + if service == "" { + service = "unknown" + } + return service + }, + }, + Skipper: func(c echo.Context) bool { + return c.Path() == "/metrics" || c.Path() == "/health" + }, + })) + // Postrgresqlとの接続 db, err := gorm.Open(postgres.Open(config.Server.Dsn), &gorm.Config{}) if err != nil { @@ -201,6 +207,7 @@ func main() { } handler := func(c echo.Context) error { + c.Response().Header().Set("cc-service", service.Name) claims, ok := c.Get("jwtclaims").(jwt.Claims) if ok { c.Request().Header.Set("cc-issuer", claims.Issuer) diff --git a/x/association/handler.go b/x/association/handler.go index aefc4c73..4520cb3a 100644 --- a/x/association/handler.go +++ b/x/association/handler.go @@ -6,7 +6,6 @@ import ( "net/http" "github.com/labstack/echo/v4" - "github.com/totegamma/concurrent/x/message" "go.opentelemetry.io/otel" "gorm.io/gorm" ) @@ -25,12 +24,11 @@ type Handler interface { type handler struct { service Service - message message.Service } // NewHandler creates a new handler -func NewHandler(service Service, message message.Service) Handler { - return &handler{service: service, message: message} +func NewHandler(service Service) Handler { + return &handler{service: service} } // Get returns an association by ID @@ -139,22 +137,9 @@ func (h handler) Delete(c echo.Context) error { defer span.End() associationID := c.Param("id") + requester := c.Get("requester").(string) - association, err := h.service.Get(ctx, associationID) - if err != nil { - return c.JSON(http.StatusNotFound, echo.Map{"error": "target association not found"}) - } - - message, err := h.message.Get(ctx, association.TargetID) - if err == nil { // if target message exists - - requester := c.Get("requester").(string) - if (association.Author != requester) && (message.Author != requester) { - return c.JSON(http.StatusForbidden, echo.Map{"error": "you are not authorized to perform this action"}) - } - } - - deleted, err := h.service.Delete(ctx, associationID) + deleted, err := h.service.Delete(ctx, associationID, requester) if err != nil { return err } diff --git a/x/association/repository.go b/x/association/repository.go index 1f3ca441..8dd9605c 100644 --- a/x/association/repository.go +++ b/x/association/repository.go @@ -3,8 +3,11 @@ package association import ( "context" "fmt" + "github.com/bradfitz/gomemcache/memcache" "github.com/totegamma/concurrent/x/core" "gorm.io/gorm" + "log/slog" + "strconv" ) // Repository is the interface for association repository @@ -19,15 +22,48 @@ type Repository interface { GetCountsBySchemaAndVariant(ctx context.Context, messageID string, schema string) (map[string]int64, error) GetBySchemaAndVariant(ctx context.Context, messageID string, schema string, variant string) ([]core.Association, error) GetOwnByTarget(ctx context.Context, targetID, author string) ([]core.Association, error) + Count(ctx context.Context) (int64, error) } type repository struct { db *gorm.DB + mc *memcache.Client } // NewRepository creates a new association repository -func NewRepository(db *gorm.DB) Repository { - return &repository{db: db} +func NewRepository(db *gorm.DB, mc *memcache.Client) Repository { + + var count int64 + err := db.Model(&core.Association{}).Count(&count).Error + if err != nil { + slog.Error( + "failed to count associations", + slog.String("error", err.Error()), + ) + } + + mc.Set(&memcache.Item{Key: "association_count", Value: []byte(strconv.FormatInt(count, 10))}) + + return &repository{db, mc} +} + +// Total returns the total number of associations +func (r *repository) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "RepositoryTotal") + defer span.End() + + item, err := r.mc.Get("association_count") + if err != nil { + span.RecordError(err) + return 0, err + } + + count, err := strconv.ParseInt(string(item.Value), 10, 64) + if err != nil { + span.RecordError(err) + return 0, err + } + return count, nil } // Create creates new association @@ -37,6 +73,8 @@ func (r *repository) Create(ctx context.Context, association core.Association) ( err := r.db.WithContext(ctx).Create(&association).Error + r.mc.Increment("association_count", 1) + return association, err } @@ -75,6 +113,9 @@ func (r *repository) Delete(ctx context.Context, id string) (core.Association, e fmt.Printf("Error deleting association: %v\n", err) return core.Association{}, err } + + r.mc.Decrement("association_count", 1) + return deleted, nil } diff --git a/x/association/service.go b/x/association/service.go index 2b8c2d0e..65e1fc40 100644 --- a/x/association/service.go +++ b/x/association/service.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "fmt" "log/slog" "github.com/totegamma/concurrent/x/core" @@ -18,7 +19,7 @@ type Service interface { PostAssociation(ctx context.Context, objectStr string, signature string, streams []string, targetType string) (core.Association, error) Get(ctx context.Context, id string) (core.Association, error) GetOwn(ctx context.Context, author string) ([]core.Association, error) - Delete(ctx context.Context, id string) (core.Association, error) + Delete(ctx context.Context, id, requester string) (core.Association, error) GetByTarget(ctx context.Context, targetID string) ([]core.Association, error) GetCountsBySchema(ctx context.Context, messageID string) (map[string]int64, error) @@ -26,6 +27,7 @@ type Service interface { GetCountsBySchemaAndVariant(ctx context.Context, messageID string, schema string) (map[string]int64, error) GetBySchemaAndVariant(ctx context.Context, messageID string, schema string, variant string) ([]core.Association, error) GetOwnByTarget(ctx context.Context, targetID, author string) ([]core.Association, error) + Count(ctx context.Context) (int64, error) } type service struct { @@ -39,6 +41,14 @@ func NewService(repo Repository, stream stream.Service, message message.Service) return &service{repo, stream, message} } +// Count returns the count number of messages +func (s *service) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "ServiceCount") + defer span.End() + + return s.repo.Count(ctx) +} + // PostAssociation creates a new association // If targetType is messages, it also posts the association to the target message's streams // returns the created association @@ -153,25 +163,36 @@ func (s *service) GetOwn(ctx context.Context, author string) ([]core.Association } // Delete deletes an association by ID -func (s *service) Delete(ctx context.Context, id string) (core.Association, error) { +func (s *service) Delete(ctx context.Context, id, requester string) (core.Association, error) { ctx, span := tracer.Start(ctx, "ServiceDelete") defer span.End() - deleted, err := s.repo.Delete(ctx, id) + targetAssociation, err := s.repo.Get(ctx, id) if err != nil { span.RecordError(err) return core.Association{}, err } - if deleted.TargetType != "messages" { // distribute is needed only when targetType is messages - return deleted, nil + targetMessage, err := s.message.Get(ctx, targetAssociation.TargetID) + if err != nil { + span.RecordError(err) + return core.Association{}, err } - targetMessage, err := s.message.Get(ctx, deleted.TargetID) + if (targetAssociation.Author != requester) && (targetMessage.Author != requester) { + return core.Association{}, fmt.Errorf("you are not authorized to perform this action") + } + + deleted, err := s.repo.Delete(ctx, id) if err != nil { span.RecordError(err) - return deleted, err + return core.Association{}, err } + + if deleted.TargetType != "messages" { // distribute is needed only when targetType is messages + return deleted, nil + } + for _, posted := range targetMessage.Streams { event := core.Event{ Stream: posted, diff --git a/x/character/repository.go b/x/character/repository.go index 60ce2529..5fd4ca37 100644 --- a/x/character/repository.go +++ b/x/character/repository.go @@ -2,30 +2,84 @@ package character import ( "context" + "github.com/bradfitz/gomemcache/memcache" "github.com/totegamma/concurrent/x/core" "gorm.io/gorm" + "log/slog" + "strconv" ) // Repository is the interface for character repository type Repository interface { Upsert(ctx context.Context, character core.Character) error Get(ctx context.Context, owner string, schema string) ([]core.Character, error) + Count(ctx context.Context) (int64, error) } type repository struct { db *gorm.DB + mc *memcache.Client } // NewRepository creates a new character repository -func NewRepository(db *gorm.DB) Repository { - return &repository{db: db} +func NewRepository(db *gorm.DB, mc *memcache.Client) Repository { + + var count int64 + err := db.Model(&core.Character{}).Count(&count).Error + if err != nil { + slog.Error( + "failed to count characters", + slog.String("error", err.Error()), + ) + } + + mc.Set(&memcache.Item{Key: "character_count", Value: []byte(strconv.FormatInt(count, 10))}) + + return &repository{db, mc} +} + +// Total returns the total number of characters +func (r *repository) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "RepositoryTotal") + defer span.End() + + item, err := r.mc.Get("character_count") + if err != nil { + span.RecordError(err) + return 0, err + } + + count, err := strconv.ParseInt(string(item.Value), 10, 64) + if err != nil { + span.RecordError(err) + return 0, err + } + return count, nil } // Upsert creates and updates character func (r *repository) Upsert(ctx context.Context, character core.Character) error { ctx, span := tracer.Start(ctx, "RepositoryUpsert") defer span.End() - return r.db.WithContext(ctx).Save(&character).Error + + err := r.db.WithContext(ctx).Save(&character).Error + if err != nil { + span.RecordError(err) + return err + } + + var count int64 + err = r.db.Model(&core.Character{}).Count(&count).Error + if err != nil { + slog.Error( + "failed to count associations", + slog.String("error", err.Error()), + ) + } + + r.mc.Set(&memcache.Item{Key: "character_count", Value: []byte(strconv.FormatInt(count, 10))}) + + return nil } // Get returns a character by owner and schema diff --git a/x/character/service.go b/x/character/service.go index ddddcc72..a31073fd 100644 --- a/x/character/service.go +++ b/x/character/service.go @@ -11,6 +11,7 @@ import ( type Service interface { GetCharacters(ctx context.Context, owner string, schema string) ([]core.Character, error) PutCharacter(ctx context.Context, objectStr string, signature string, id string) (core.Character, error) + Count(ctx context.Context) (int64, error) } type service struct { @@ -22,6 +23,14 @@ func NewService(repo Repository) Service { return &service{repo: repo} } +// Count returns the count number of messages +func (s *service) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "ServiceCount") + defer span.End() + + return s.repo.Count(ctx) +} + // GetCharacters returns characters by owner and schema func (s *service) GetCharacters(ctx context.Context, owner string, schema string) ([]core.Character, error) { ctx, span := tracer.Start(ctx, "ServiceGetCharacters") diff --git a/x/entity/repository.go b/x/entity/repository.go index fe83f252..d51f0db3 100644 --- a/x/entity/repository.go +++ b/x/entity/repository.go @@ -2,9 +2,13 @@ package entity import ( "context" - "github.com/totegamma/concurrent/x/core" "gorm.io/gorm" + "log/slog" + "strconv" "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/totegamma/concurrent/x/core" ) // Repository is the interface for host repository @@ -17,7 +21,7 @@ type Repository interface { Delete(ctx context.Context, key string) error Ack(ctx context.Context, ack *core.Ack) error Unack(ctx context.Context, ack *core.Ack) error - Total(ctx context.Context) (int64, error) + Count(ctx context.Context) (int64, error) GetAcker(ctx context.Context, key string) ([]core.Ack, error) GetAcking(ctx context.Context, key string) ([]core.Ack, error) GetAddress(ctx context.Context, ccid string) (core.Address, error) @@ -27,21 +31,43 @@ type Repository interface { type repository struct { db *gorm.DB + mc *memcache.Client } -// Total returns the total number of entities -func (r *repository) Total(ctx context.Context) (int64, error) { - ctx, span := tracer.Start(ctx, "RepositoryTotal") - defer span.End() +// NewRepository creates a new host repository +func NewRepository(db *gorm.DB, mc *memcache.Client) Repository { var count int64 - err := r.db.WithContext(ctx).Model(&core.Entity{}).Count(&count).Error - return count, err + err := db.Model(&core.Entity{}).Count(&count).Error + if err != nil { + slog.Error( + "failed to count entities", + slog.String("error", err.Error()), + ) + } + + mc.Set(&memcache.Item{Key: "entity_count", Value: []byte(strconv.FormatInt(count, 10))}) + + return &repository{db, mc} } -// NewRepository creates a new host repository -func NewRepository(db *gorm.DB) Repository { - return &repository{db: db} +// Count returns the total number of entities +func (r *repository) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "RepositoryCount") + defer span.End() + + item, err := r.mc.Get("entity_count") + if err != nil { + span.RecordError(err) + return 0, err + } + + count, err := strconv.ParseInt(string(item.Value), 10, 64) + if err != nil { + span.RecordError(err) + return 0, err + } + return count, nil } // GetAddress returns the address of a entity @@ -109,6 +135,10 @@ func (r *repository) CreateEntity(ctx context.Context, entity *core.Entity, meta return nil }) + if err == nil { + r.mc.Increment("entity_count", 1) + } + return err } @@ -137,7 +167,13 @@ func (r *repository) Delete(ctx context.Context, id string) error { ctx, span := tracer.Start(ctx, "RepositoryDelete") defer span.End() - return r.db.WithContext(ctx).Delete(&core.Entity{}, "id = ?", id).Error + err := r.db.WithContext(ctx).Delete(&core.Entity{}, "id = ?", id).Error + + if err == nil { + r.mc.Decrement("entity_count", 1) + } + + return err } // Update updates a entity diff --git a/x/entity/service.go b/x/entity/service.go index 509d29cd..f4ac4ed1 100644 --- a/x/entity/service.go +++ b/x/entity/service.go @@ -33,12 +33,12 @@ type Service interface { IsUserExists(ctx context.Context, user string) bool Delete(ctx context.Context, id string) error Ack(ctx context.Context, from, to string) error - Total(ctx context.Context) (int64, error) GetAcker(ctx context.Context, key string) ([]core.Ack, error) GetAcking(ctx context.Context, key string) ([]core.Ack, error) GetAddress(ctx context.Context, ccid string) (core.Address, error) UpdateAddress(ctx context.Context, ccid string, domain string, signedAt time.Time) error UpdateRegistration(ctx context.Context, id string, payload string, signature string) error // NOTE: for migration. Remove later + Count(ctx context.Context) (int64, error) } type service struct { @@ -56,12 +56,12 @@ func NewService(repository Repository, config util.Config, jwtService jwt.Servic } } -// Total returns the total number of entities -func (s *service) Total(ctx context.Context) (int64, error) { - ctx, span := tracer.Start(ctx, "ServiceTotal") +// Total returns the count number of entities +func (s *service) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "ServiceCount") defer span.End() - return s.repository.Total(ctx) + return s.repository.Count(ctx) } // Create creates new entity diff --git a/x/message/repository.go b/x/message/repository.go index 321bf4b0..adc93058 100644 --- a/x/message/repository.go +++ b/x/message/repository.go @@ -2,8 +2,11 @@ package message import ( "context" + "github.com/bradfitz/gomemcache/memcache" "github.com/totegamma/concurrent/x/core" "gorm.io/gorm" + "log/slog" + "strconv" ) // Repository is the interface for message repository @@ -13,26 +16,48 @@ type Repository interface { GetWithAssociations(ctx context.Context, key string) (core.Message, error) GetWithOwnAssociations(ctx context.Context, key string, ccid string) (core.Message, error) Delete(ctx context.Context, key string) (core.Message, error) - Total(ctx context.Context) (int64, error) + Count(ctx context.Context) (int64, error) } type repository struct { db *gorm.DB + mc *memcache.Client +} + +// NewRepository creates a new message repository +func NewRepository(db *gorm.DB, mc *memcache.Client) Repository { + + var count int64 + err := db.Model(&core.Message{}).Count(&count).Error + if err != nil { + slog.Error( + "failed to count messages", + slog.String("error", err.Error()), + ) + } + + mc.Set(&memcache.Item{Key: "message_count", Value: []byte(strconv.FormatInt(count, 10))}) + + return &repository{db, mc} } // Total returns the total number of messages -func (r *repository) Total(ctx context.Context) (int64, error) { +func (r *repository) Count(ctx context.Context) (int64, error) { ctx, span := tracer.Start(ctx, "RepositoryTotal") defer span.End() - var count int64 - err := r.db.WithContext(ctx).Model(&core.Message{}).Count(&count).Error - return count, err -} + item, err := r.mc.Get("message_count") + if err != nil { + span.RecordError(err) + return 0, err + } -// NewRepository creates a new message repository -func NewRepository(db *gorm.DB) Repository { - return &repository{db: db} + count, err := strconv.ParseInt(string(item.Value), 10, 64) + if err != nil { + span.RecordError(err) + return 0, err + } + return count, nil } // Create creates new message @@ -41,6 +66,9 @@ func (r *repository) Create(ctx context.Context, message core.Message) (core.Mes defer span.End() err := r.db.WithContext(ctx).Create(&message).Error + + r.mc.Increment("message_count", 1) + return message, err } @@ -94,5 +122,7 @@ func (r *repository) Delete(ctx context.Context, id string) (core.Message, error return deleted, err } + r.mc.Decrement("message_count", 1) + return deleted, nil } diff --git a/x/message/service.go b/x/message/service.go index 12cdad78..a4f613b8 100644 --- a/x/message/service.go +++ b/x/message/service.go @@ -19,7 +19,7 @@ type Service interface { GetWithOwnAssociations(ctx context.Context, id string, requester string) (core.Message, error) PostMessage(ctx context.Context, objectStr string, signature string, streams []string) (core.Message, error) Delete(ctx context.Context, id string) (core.Message, error) - Total(ctx context.Context) (int64, error) + Count(ctx context.Context) (int64, error) } type service struct { @@ -33,12 +33,12 @@ func NewService(rdb *redis.Client, repo Repository, stream stream.Service) Servi return &service{rdb, repo, stream} } -// Total returns the total number of messages -func (s *service) Total(ctx context.Context) (int64, error) { - ctx, span := tracer.Start(ctx, "ServiceTotal") +// Count returns the count number of messages +func (s *service) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "ServiceCount") defer span.End() - return s.repo.Total(ctx) + return s.repo.Count(ctx) } // Get returns a message by ID diff --git a/x/stream/mock/service.go b/x/stream/mock/service.go index 0d892550..bbfa7e5e 100644 --- a/x/stream/mock/service.go +++ b/x/stream/mock/service.go @@ -37,6 +37,21 @@ func (m *MockService) EXPECT() *MockServiceMockRecorder { return m.recorder } +// Count mocks base method. +func (m *MockService) Count(ctx context.Context) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Count", ctx) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Count indicates an expected call of Count. +func (mr *MockServiceMockRecorder) Count(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Count", reflect.TypeOf((*MockService)(nil).Count), ctx) +} + // CreateStream mocks base method. func (m *MockService) CreateStream(ctx context.Context, stream core.Stream) (core.Stream, error) { m.ctrl.T.Helper() diff --git a/x/stream/repository.go b/x/stream/repository.go index 74cff1a4..1596dc66 100644 --- a/x/stream/repository.go +++ b/x/stream/repository.go @@ -7,6 +7,7 @@ import ( "io" "log/slog" "net/http" + "strconv" "strings" "time" @@ -49,6 +50,7 @@ type Repository interface { PublishEvent(ctx context.Context, event core.Event) error ListStreamSubscriptions(ctx context.Context) (map[string]int64, error) + Count(ctx context.Context) (int64, error) } type repository struct { @@ -61,9 +63,40 @@ type repository struct { // NewRepository creates a new stream repository func NewRepository(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) Repository { + + var count int64 + err := db.Model(&core.Stream{}).Count(&count).Error + if err != nil { + slog.Error( + "failed to count streams", + slog.String("error", err.Error()), + ) + } + + mc.Set(&memcache.Item{Key: "stream_count", Value: []byte(strconv.FormatInt(count, 10))}) + return &repository{db, rdb, mc, manager, config} } +// Total returns the total number of messages +func (r *repository) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "RepositoryTotal") + defer span.End() + + item, err := r.mc.Get("stream_count") + if err != nil { + span.RecordError(err) + return 0, err + } + + count, err := strconv.ParseInt(string(item.Value), 10, 64) + if err != nil { + span.RecordError(err) + return 0, err + } + return count, nil +} + func (r *repository) PublishEvent(ctx context.Context, event core.Event) error { ctx, span := tracer.Start(ctx, "ServiceDistributeEvents") defer span.End() @@ -418,6 +451,9 @@ func (r *repository) CreateStream(ctx context.Context, stream core.Stream) (core defer span.End() err := r.db.WithContext(ctx).Create(&stream).Error + + r.mc.Increment("stream_count", 1) + return stream, err } @@ -460,6 +496,8 @@ func (r *repository) DeleteStream(ctx context.Context, streamID string) error { ctx, span := tracer.Start(ctx, "RepositoryDeleteStream") defer span.End() + r.mc.Decrement("stream_count", 1) + return r.db.WithContext(ctx).Delete(&core.Stream{}, "id = ?", streamID).Error } diff --git a/x/stream/service.go b/x/stream/service.go index b2613479..49e82d80 100644 --- a/x/stream/service.go +++ b/x/stream/service.go @@ -49,6 +49,7 @@ type Service interface { GetChunksFromRemote(ctx context.Context, host string, streams []string, pivot time.Time) (map[string]Chunk, error) ListStreamSubscriptions(ctx context.Context) (map[string]int64, error) + Count(ctx context.Context) (int64, error) } type service struct { @@ -62,6 +63,14 @@ func NewService(repository Repository, entity entity.Service, config util.Config return &service{repository, entity, config} } +// Count returns the count number of messages +func (s *service) Count(ctx context.Context) (int64, error) { + ctx, span := tracer.Start(ctx, "ServiceCount") + defer span.End() + + return s.repository.Count(ctx) +} + func min(a, b int) int { if a < b { return a