Skip to content

Commit

Permalink
update improve obervability
Browse files Browse the repository at this point in the history
  • Loading branch information
totegamma committed Feb 2, 2024
1 parent 1e8d594 commit 530bdbb
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 110 deletions.
82 changes: 70 additions & 12 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//go:generate go run github.com/google/wire/cmd/wire gen .
package main

import (
Expand All @@ -20,8 +19,13 @@ import (
"github.com/labstack/echo/v4/middleware"
"github.com/prometheus/client_golang/prometheus"

"github.com/totegamma/concurrent/x/association"
"github.com/totegamma/concurrent/x/auth"
"github.com/totegamma/concurrent/x/character"
"github.com/totegamma/concurrent/x/core"
"github.com/totegamma/concurrent/x/entity"
"github.com/totegamma/concurrent/x/message"
"github.com/totegamma/concurrent/x/stream"
"github.com/totegamma/concurrent/x/util"

"github.com/bradfitz/gomemcache/memcache"
Expand Down Expand Up @@ -177,21 +181,31 @@ func main() {
}
defer mc.Close()

agent := SetupAgent(db, rdb, config)
agent := SetupAgent(db, rdb, mc, config)

Check failure on line 184 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupAgent

socketManager := SetupSocketManager(mc, db, rdb, config)

Check failure on line 186 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupSocketManager
socketHandler := SetupSocketHandler(rdb, socketManager, config)

Check failure on line 187 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupSocketHandler
messageHandler := SetupMessageHandler(db, rdb, mc, socketManager, config)
characterHandler := SetupCharacterHandler(db, config)
associationHandler := SetupAssociationHandler(db, rdb, mc, socketManager, config)
streamHandler := SetupStreamHandler(db, rdb, mc, socketManager, config)
domainHandler := SetupDomainHandler(db, config)

Check failure on line 188 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupDomainHandler
entityHandler := SetupEntityHandler(db, rdb, config)
authHandler := SetupAuthHandler(db, rdb, config)
userkvHandler := SetupUserkvHandler(db, rdb, config)
userkvHandler := SetupUserkvHandler(db, rdb, mc, config)

Check failure on line 189 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupUserkvHandler
collectionHandler := SetupCollectionHandler(db, rdb, config)

Check failure on line 190 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupCollectionHandler

authService := SetupAuthService(db, rdb, config)
messageService := SetupMessageService(db, rdb, mc, socketManager, config)

Check failure on line 192 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupMessageService
messageHandler := message.NewHandler(messageService)

associationService := SetupAssociationService(db, rdb, mc, socketManager, config)

Check failure on line 195 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupAssociationService
associationHandler := association.NewHandler(associationService)

characterService := SetupCharacterService(db, mc, config)

Check failure on line 198 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupCharacterService
characterHandler := character.NewHandler(characterService)

streamService := SetupStreamService(db, rdb, mc, socketManager, config)

Check failure on line 201 in cmd/api/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: SetupStreamService
streamHandler := stream.NewHandler(streamService)

entityService := SetupEntityService(db, rdb, mc, config)
entityHandler := entity.NewHandler(entityService, config)

authService := SetupAuthService(db, rdb, mc, config)
authHandler := auth.NewHandler(authService)

apiV1 := e.Group("", auth.ParseJWT)
// domain
Expand Down Expand Up @@ -313,19 +327,63 @@ func main() {
)
prometheus.MustRegister(streamSubscriptionMetrics)

var streamService = SetupStreamService(db, rdb, mc, socketManager, config)
var resourceCountMetrics = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cc_resources_count",
Help: "resources count",
},
[]string{"type"},
)
prometheus.MustRegister(resourceCountMetrics)

go func() {
for {
time.Sleep(15 * time.Second)
subscriptions, err := streamService.ListStreamSubscriptions(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
subscriptions, err := streamService.ListStreamSubscriptions(ctx)
if err != nil {
slog.Error(fmt.Sprintf("failed to list stream subscriptions: %v", err))
continue
}
for stream, count := range subscriptions {
streamSubscriptionMetrics.WithLabelValues(stream).Set(float64(count))
}

count, err := messageService.Count(ctx)
if err != nil {
slog.Error(fmt.Sprintf("failed to count messages: %v", err))
continue
}
resourceCountMetrics.WithLabelValues("message").Set(float64(count))

count, err = entityService.Count(ctx)
if err != nil {
slog.Error(fmt.Sprintf("failed to count entities: %v", err))
continue
}
resourceCountMetrics.WithLabelValues("entity").Set(float64(count))

count, err = characterService.Count(ctx)
if err != nil {
slog.Error(fmt.Sprintf("failed to count characters: %v", err))
continue
}
resourceCountMetrics.WithLabelValues("character").Set(float64(count))

count, err = associationService.Count(ctx)
if err != nil {
slog.Error(fmt.Sprintf("failed to count associations: %v", err))
continue
}
resourceCountMetrics.WithLabelValues("association").Set(float64(count))

count, err = streamService.Count(ctx)
if err != nil {
slog.Error(fmt.Sprintf("failed to count streams: %v", err))
continue
}
resourceCountMetrics.WithLabelValues("stream").Set(float64(count))
}
}()

Expand Down
40 changes: 14 additions & 26 deletions cmd/api/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,28 @@ import (
)

var domainHandlerProvider = wire.NewSet(domain.NewHandler, domain.NewService, domain.NewRepository)
var entityHandlerProvider = wire.NewSet(entity.NewHandler, entity.NewService, entity.NewRepository)
var streamHandlerProvider = wire.NewSet(stream.NewHandler, stream.NewService, stream.NewRepository, entity.NewService, entity.NewRepository)
var messageHandlerProvider = wire.NewSet(message.NewHandler, message.NewService, message.NewRepository)
var characterHandlerProvider = wire.NewSet(character.NewHandler, character.NewService, character.NewRepository)
var associationHandlerProvider = wire.NewSet(association.NewHandler, association.NewService, association.NewRepository, message.NewService, message.NewRepository)
var userkvHandlerProvider = wire.NewSet(userkv.NewHandler, userkv.NewService, userkv.NewRepository)
var collectionHandlerProvider = wire.NewSet(collection.NewHandler, collection.NewService, collection.NewRepository)

var jwtServiceProvider = wire.NewSet(jwt.NewService, jwt.NewRepository)
var entityServiceProvider = wire.NewSet(entity.NewService, entity.NewRepository, jwtServiceProvider)
var streamServiceProvider = wire.NewSet(stream.NewService, stream.NewRepository, entityServiceProvider)
var messageServiceProvider = wire.NewSet(message.NewService, message.NewRepository, streamServiceProvider)
var associationServiceProvider = wire.NewSet(association.NewService, association.NewRepository, messageServiceProvider)
var characterServiceProvider = wire.NewSet(character.NewService, character.NewRepository)

func SetupMessageHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) message.Handler {
wire.Build(messageHandlerProvider, jwtServiceProvider, stream.NewService, stream.NewRepository, entity.NewService, entity.NewRepository)
func SetupMessageService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) message.Service {
wire.Build(messageServiceProvider)
return nil
}

func SetupCharacterHandler(db *gorm.DB, config util.Config) character.Handler {
wire.Build(characterHandlerProvider)
func SetupCharacterService(db *gorm.DB, mc *memcache.Client, config util.Config) character.Service {
wire.Build(characterServiceProvider)
return nil
}

func SetupAssociationHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) association.Handler {
wire.Build(associationHandlerProvider, jwtServiceProvider, stream.NewService, stream.NewRepository, entity.NewService, entity.NewRepository)
return nil
}

func SetupStreamHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) stream.Handler {
wire.Build(streamHandlerProvider, jwtServiceProvider)
func SetupAssociationService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, manager socket.Manager, config util.Config) association.Service {
wire.Build(associationServiceProvider)
return nil
}

Expand All @@ -66,8 +59,8 @@ func SetupDomainHandler(db *gorm.DB, config util.Config) domain.Handler {
return nil
}

func SetupEntityHandler(db *gorm.DB, rdb *redis.Client, config util.Config) entity.Handler {
wire.Build(entityHandlerProvider, jwtServiceProvider)
func SetupEntityService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) entity.Service {
wire.Build(entityServiceProvider)
return nil
}

Expand All @@ -76,22 +69,17 @@ func SetupSocketHandler(rdb *redis.Client, manager socket.Manager, config util.C
return nil
}

func SetupAgent(db *gorm.DB, rdb *redis.Client, config util.Config) agent.Agent {
func SetupAgent(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) agent.Agent {
wire.Build(agent.NewAgent, jwtServiceProvider, domain.NewService, domain.NewRepository, entity.NewService, entity.NewRepository)
return nil
}

func SetupAuthHandler(db *gorm.DB, rdb *redis.Client, config util.Config) auth.Handler {
wire.Build(jwtServiceProvider, auth.NewHandler, auth.NewService, auth.NewRepository, entity.NewService, entity.NewRepository, domain.NewService, domain.NewRepository)
return nil
}

func SetupAuthService(db *gorm.DB, rdb *redis.Client, config util.Config) auth.Service {
func SetupAuthService(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) auth.Service {
wire.Build(jwtServiceProvider, auth.NewService, auth.NewRepository, entity.NewService, entity.NewRepository, domain.NewService, domain.NewRepository)
return nil
}

func SetupUserkvHandler(db *gorm.DB, rdb *redis.Client, config util.Config) userkv.Handler {
func SetupUserkvHandler(db *gorm.DB, rdb *redis.Client, mc *memcache.Client, config util.Config) userkv.Handler {
wire.Build(userkvHandlerProvider, jwtServiceProvider, entity.NewService, entity.NewRepository)
return nil
}
Expand Down
27 changes: 17 additions & 10 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"

Expand Down Expand Up @@ -75,14 +74,6 @@ func main() {
log.Print("Config loaded! I am: ", config.Concurrent.CCID)

// Echoの設定
logfile, err := os.OpenFile(filepath.Join(config.Server.LogPath, "gateway-access.log"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal(err)
}
defer logfile.Close()

e.Logger.SetOutput(logfile)

e.HidePort = true
e.HideBanner = true

Expand Down Expand Up @@ -127,9 +118,24 @@ func main() {
},
}))

e.Use(echoprometheus.NewMiddleware("ccgateway"))
e.Use(auth.ParseJWT)

e.Use(echoprometheus.NewMiddlewareWithConfig(echoprometheus.MiddlewareConfig{
Namespace: "ccgateway",
LabelFuncs: map[string]echoprometheus.LabelValueFunc{
"service": func(c echo.Context, err error) string {
service := c.Response().Header().Get("cc-service")
if service == "" {
service = "unknown"
}
return service
},
},
Skipper: func(c echo.Context) bool {
return c.Path() == "/metrics" || c.Path() == "/health"
},
}))

// Postrgresqlとの接続
db, err := gorm.Open(postgres.Open(config.Server.Dsn), &gorm.Config{})
if err != nil {
Expand Down Expand Up @@ -201,6 +207,7 @@ func main() {
}

handler := func(c echo.Context) error {
c.Response().Header().Set("cc-service", service.Name)
claims, ok := c.Get("jwtclaims").(jwt.Claims)
if ok {
c.Request().Header.Set("cc-issuer", claims.Issuer)
Expand Down
23 changes: 4 additions & 19 deletions x/association/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"

"github.com/labstack/echo/v4"
"github.com/totegamma/concurrent/x/message"
"go.opentelemetry.io/otel"
"gorm.io/gorm"
)
Expand All @@ -25,12 +24,11 @@ type Handler interface {

type handler struct {
service Service
message message.Service
}

// NewHandler creates a new handler
func NewHandler(service Service, message message.Service) Handler {
return &handler{service: service, message: message}
func NewHandler(service Service) Handler {
return &handler{service: service}
}

// Get returns an association by ID
Expand Down Expand Up @@ -139,22 +137,9 @@ func (h handler) Delete(c echo.Context) error {
defer span.End()

associationID := c.Param("id")
requester := c.Get("requester").(string)

association, err := h.service.Get(ctx, associationID)
if err != nil {
return c.JSON(http.StatusNotFound, echo.Map{"error": "target association not found"})
}

message, err := h.message.Get(ctx, association.TargetID)
if err == nil { // if target message exists

requester := c.Get("requester").(string)
if (association.Author != requester) && (message.Author != requester) {
return c.JSON(http.StatusForbidden, echo.Map{"error": "you are not authorized to perform this action"})
}
}

deleted, err := h.service.Delete(ctx, associationID)
deleted, err := h.service.Delete(ctx, associationID, requester)
if err != nil {
return err
}
Expand Down
45 changes: 43 additions & 2 deletions x/association/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package association
import (
"context"
"fmt"
"github.com/bradfitz/gomemcache/memcache"
"github.com/totegamma/concurrent/x/core"
"gorm.io/gorm"
"log/slog"
"strconv"
)

// Repository is the interface for association repository
Expand All @@ -19,15 +22,48 @@ type Repository interface {
GetCountsBySchemaAndVariant(ctx context.Context, messageID string, schema string) (map[string]int64, error)
GetBySchemaAndVariant(ctx context.Context, messageID string, schema string, variant string) ([]core.Association, error)
GetOwnByTarget(ctx context.Context, targetID, author string) ([]core.Association, error)
Count(ctx context.Context) (int64, error)
}

type repository struct {
db *gorm.DB
mc *memcache.Client
}

// NewRepository creates a new association repository
func NewRepository(db *gorm.DB) Repository {
return &repository{db: db}
func NewRepository(db *gorm.DB, mc *memcache.Client) Repository {

var count int64
err := db.Model(&core.Association{}).Count(&count).Error
if err != nil {
slog.Error(
"failed to count associations",
slog.String("error", err.Error()),
)
}

mc.Set(&memcache.Item{Key: "association_count", Value: []byte(strconv.FormatInt(count, 10))})

return &repository{db, mc}
}

// Total returns the total number of associations
func (r *repository) Count(ctx context.Context) (int64, error) {
ctx, span := tracer.Start(ctx, "RepositoryTotal")
defer span.End()

item, err := r.mc.Get("association_count")
if err != nil {
span.RecordError(err)
return 0, err
}

count, err := strconv.ParseInt(string(item.Value), 10, 64)
if err != nil {
span.RecordError(err)
return 0, err
}
return count, nil
}

// Create creates new association
Expand All @@ -37,6 +73,8 @@ func (r *repository) Create(ctx context.Context, association core.Association) (

err := r.db.WithContext(ctx).Create(&association).Error

r.mc.Increment("association_count", 1)

return association, err
}

Expand Down Expand Up @@ -75,6 +113,9 @@ func (r *repository) Delete(ctx context.Context, id string) (core.Association, e
fmt.Printf("Error deleting association: %v\n", err)
return core.Association{}, err
}

r.mc.Decrement("association_count", 1)

return deleted, nil
}

Expand Down
Loading

0 comments on commit 530bdbb

Please sign in to comment.