From 94086a31a3fa7e8587d7125e126a01f93c033275 Mon Sep 17 00:00:00 2001 From: totegamma Date: Fri, 2 Feb 2024 02:04:32 +0900 Subject: [PATCH] use slog --- cmd/api/main.go | 7 +- web/package.json | 2 +- web/pnpm-lock.yaml | 8 +-- web/src/context/apiContext.tsx | 2 +- web/src/pages/Home.tsx | 2 +- x/agent/main.go | 27 ++++++-- x/association/service.go | 8 +-- x/message/service.go | 6 +- x/socket/handler.go | 53 ++++++++++++--- x/socket/manager.go | 118 +++++++++++++++++++++++++++------ x/stream/repository.go | 14 +++- x/stream/service.go | 35 ++++++---- 12 files changed, 215 insertions(+), 67 deletions(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index e687a724..6a8cac5e 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -137,7 +137,7 @@ func main() { } // Migrate the schema - log.Println("start migrate") + slog.Info("start migrate") db.AutoMigrate( &core.Message{}, &core.Character{}, @@ -172,7 +172,6 @@ func main() { } mc := memcache.New(config.Server.MemcachedAddr) - log.Println("config.Server.MemcachedAddr", config.Server.MemcachedAddr) if err != nil { panic("failed to connect memcached") } @@ -321,7 +320,7 @@ func main() { time.Sleep(15 * time.Second) subscriptions, err := streamService.ListStreamSubscriptions(context.Background()) if err != nil { - log.Println("failed to list stream subscriptions", err) + slog.Error(fmt.Sprintf("failed to list stream subscriptions: %v", err)) continue } for stream, count := range subscriptions { @@ -371,7 +370,7 @@ func setupTraceProvider(endpoint string, serviceName string, serviceVersion stri ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := tracerProvider.Shutdown(ctx); err != nil { - log.Printf("Failed to shutdown tracer provider: %v", err) + slog.Error(fmt.Sprintf("Failed to shutdown tracer provider: %v", err)) } } return cleanup, nil diff --git a/web/package.json b/web/package.json index 18e2e705..055f157d 100644 --- a/web/package.json +++ b/web/package.json @@ -10,7 +10,7 @@ "preview": "vite preview" }, "dependencies": { - "@concurrent-world/client": "^4.1.2", + "@concurrent-world/client": "^4.1.4", "@emotion/react": "^11.11.0", "@emotion/styled": "^11.11.0", "@mui/icons-material": "^5.11.16", diff --git a/web/pnpm-lock.yaml b/web/pnpm-lock.yaml index 32a0f5dd..d5cbc53d 100644 --- a/web/pnpm-lock.yaml +++ b/web/pnpm-lock.yaml @@ -6,8 +6,8 @@ settings: dependencies: '@concurrent-world/client': - specifier: ^4.1.2 - version: 4.1.2 + specifier: ^4.1.4 + version: 4.1.4 '@emotion/react': specifier: ^11.11.0 version: 11.11.0(@types/react@18.0.28)(react@18.2.0) @@ -159,8 +159,8 @@ packages: to-fast-properties: 2.0.0 dev: false - /@concurrent-world/client@4.1.2: - resolution: {integrity: sha512-9DWUosKDVP6CiYfpVEdsS1FxWB9y8nUEcpWFhucXZMKJSuTOPFTfwURa+0rlp1lYwNDE43rz3tg5yYkWjGoI/A==} + /@concurrent-world/client@4.1.4: + resolution: {integrity: sha512-cV9ieKZ1Uz38tMXxAA6wgyGrJMxvEB6ph1vRE6y1KuQvS7KgsAq74jWJaDrMHsGUdmyKGMqfYrvvFYjxa7E4Vw==} dependencies: elliptic: 6.5.4 ethers: 6.7.0 diff --git a/web/src/context/apiContext.tsx b/web/src/context/apiContext.tsx index 50ef6cfe..7b7422a9 100644 --- a/web/src/context/apiContext.tsx +++ b/web/src/context/apiContext.tsx @@ -25,7 +25,7 @@ export default function ApiProvider(props: ApiProviderProps): JSX.Element { const setJWT = useMemo(() => (jwt: string) => { setToken(jwt) - setApi(new Api({host: '', token: jwt})) + setApi(new Api({host: location.host, token: jwt})) }, [setApi]) const apiContextState = useMemo(() => ({ diff --git a/web/src/pages/Home.tsx b/web/src/pages/Home.tsx index 9741ae68..0a225a07 100644 --- a/web/src/pages/Home.tsx +++ b/web/src/pages/Home.tsx @@ -15,7 +15,7 @@ export const Home = (): JSX.Element => { useEffect(() => { if (!ccid) return - api.readEntity(ccid).then((entity) => { + api.getEntity(ccid).then((entity) => { setTag(entity?.tag.split(',') ?? []) }) }, [api]) diff --git a/x/agent/main.go b/x/agent/main.go index 7589198d..557406a4 100644 --- a/x/agent/main.go +++ b/x/agent/main.go @@ -4,6 +4,7 @@ package agent import ( "context" "encoding/json" + "fmt" "github.com/gorilla/websocket" "github.com/redis/go-redis/v9" "github.com/totegamma/concurrent/x/core" @@ -13,7 +14,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "io" - "log" + "log/slog" "math/rand" "net/http" "strconv" @@ -62,7 +63,7 @@ func (a *agent) collectUsers(ctx context.Context) { // Boot starts agent func (a *agent) Boot() { - log.Printf("agent start!") + slog.Info("agent start!") ticker60 := time.NewTicker(60 * time.Second) go func() { for { @@ -118,7 +119,11 @@ func (a *agent) pullRemoteEntities(ctx context.Context, remote core.Domain) erro err := json.Unmarshal([]byte(entity.Payload), &signedObj) if err != nil { span.RecordError(err) - log.Println(err) + slog.Error( + "pullRemoteEntities", + slog.String("error", err.Error()), + slog.String("module", "agent"), + ) continue } @@ -134,16 +139,26 @@ func (a *agent) pullRemoteEntities(ctx context.Context, remote core.Domain) erro if err != nil { span.RecordError(err) + slog.Error( + "pullRemoteEntities", + slog.String("error", err.Error()), + slog.String("module", "agent"), + ) errored = true - log.Println(err) } } if !errored { - log.Printf("[agent] pulled %d entities from %s", len(remoteEntities.Content), remote.ID) + slog.Info( + fmt.Sprintf("[agent] pulled %d entities from %s", len(remoteEntities.Content), remote.ID), + slog.String("module", "agent"), + ) a.domain.UpdateScrapeTime(ctx, remote.ID, requestTime) } else { - log.Printf("[agent] failed to pull entities from %s", remote.ID) + slog.Error( + fmt.Sprintf("[agent] failed to pull entities from %s", remote.ID), + slog.String("module", "agent"), + ) } return nil diff --git a/x/association/service.go b/x/association/service.go index 1e81395d..2b8c2d0e 100644 --- a/x/association/service.go +++ b/x/association/service.go @@ -5,7 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "log" + "log/slog" "github.com/totegamma/concurrent/x/core" "github.com/totegamma/concurrent/x/message" @@ -112,8 +112,8 @@ func (s *service) PostAssociation(ctx context.Context, objectStr string, signatu for _, stream := range association.Streams { err = s.stream.PostItem(ctx, stream, item, created) if err != nil { + slog.ErrorContext(ctx, "failed to post stream", slog.String("error", err.Error()), slog.String("module", "association")) span.RecordError(err) - log.Printf("fail to post stream: %v", err) } } @@ -128,8 +128,8 @@ func (s *service) PostAssociation(ctx context.Context, objectStr string, signatu err = s.stream.DistributeEvent(ctx, postto, event) if err != nil { + slog.ErrorContext(ctx, "failed to publish message to Redis", slog.String("error", err.Error()), slog.String("module", "association")) span.RecordError(err) - log.Printf("fail to publish message to Redis: %v", err) } } @@ -181,7 +181,7 @@ func (s *service) Delete(ctx context.Context, id string) (core.Association, erro } err := s.stream.DistributeEvent(ctx, posted, event) if err != nil { - log.Printf("fail to publish message to Redis: %v", err) + slog.ErrorContext(ctx, "failed to publish message to Redis", slog.String("error", err.Error()), slog.String("module", "association")) span.RecordError(err) return deleted, err } diff --git a/x/message/service.go b/x/message/service.go index 4c04be8b..12cdad78 100644 --- a/x/message/service.go +++ b/x/message/service.go @@ -3,7 +3,8 @@ package message import ( "context" "encoding/json" - "log" + "fmt" + "log/slog" "github.com/redis/go-redis/v9" "github.com/totegamma/concurrent/x/core" @@ -111,7 +112,8 @@ func (s *service) Delete(ctx context.Context, id string) (core.Message, error) { defer span.End() deleted, err := s.repo.Delete(ctx, id) - log.Printf("deleted: %v", deleted) + slog.DebugContext(ctx, fmt.Sprintf("deleted: %v", deleted), slog.String("module", "message")) + if err != nil { span.RecordError(err) return core.Message{}, err diff --git a/x/socket/handler.go b/x/socket/handler.go index edeaff84..c3243caa 100644 --- a/x/socket/handler.go +++ b/x/socket/handler.go @@ -2,10 +2,11 @@ package socket import ( + "fmt" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" "github.com/redis/go-redis/v9" - "log" + "log/slog" "net/http" "sync" ) @@ -50,8 +51,11 @@ func (h handler) send(ws *websocket.Conn, message string) error { func (h handler) Connect(c echo.Context) error { ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { - log.Println("Failed to upgrade WebSocket:", err) - c.Logger().Error(err) + slog.Error( + "Failed to upgrade WebSocket", + slog.String("error", err.Error()), + slog.String("module", "socket"), + ) } defer func() { h.manager.Unsubscribe(ws) @@ -70,13 +74,33 @@ func (h handler) Connect(c echo.Context) error { for { select { case <-quit: - log.Println("[socket] closed") + slog.InfoContext( + ctx, "Socket closed", + slog.String("module", "socket"), + ) return case msg := <-psch: - log.Printf("[socket] -> %s\n", msg.Payload[:64]) + + if msg == nil { + slog.WarnContext( + ctx, "received nil message", + slog.String("module", "socket"), + ) + return + } + + slog.DebugContext( + ctx, fmt.Sprintf("Socket message: %s", msg.Payload[:64]), + slog.String("module", "socket"), + ) + err = h.send(ws, msg.Payload) if err != nil { - log.Println("Error writing message: ", err) + slog.ErrorContext( + ctx, "Error writing message", + slog.String("error", err.Error()), + slog.String("module", "socket"), + ) return } } @@ -87,20 +111,31 @@ func (h handler) Connect(c echo.Context) error { var req Request err := ws.ReadJSON(&req) if err != nil { - log.Println("Error reading JSON: ", err) + slog.ErrorContext( + ctx, "Error reading JSON", + slog.String("error", err.Error()), + slog.String("module", "socket"), + ) break } if req.Type == "ping" { err = h.send(ws, "{\"type\":\"pong\"}") if err != nil { - log.Println("Error writing message: ", err) + slog.ErrorContext( + ctx, "Error writing message", + slog.String("error", err.Error()), + slog.String("module", "socket"), + ) break } continue } - log.Printf("[socket] subscribe: %s\n", req.Channels) + slog.DebugContext( + ctx, fmt.Sprintf("Socket subscribe: %s", req.Channels), + slog.String("module", "socket"), + ) pubsub.Unsubscribe(ctx) pubsub.Subscribe(ctx, req.Channels...) h.manager.Subscribe(ws, req.Channels) diff --git a/x/socket/manager.go b/x/socket/manager.go index c541a57d..bf812800 100644 --- a/x/socket/manager.go +++ b/x/socket/manager.go @@ -7,7 +7,6 @@ import ( "crypto/tls" "encoding/json" "fmt" - "log" "log/slog" "net/url" "slices" @@ -190,7 +189,11 @@ func (m *manager) deleteExcessiveSubs() { delete(m.remoteConns, domain) } - log.Printf("[remote] subscription cleaned up: %v", closeList) + slog.Info( + fmt.Sprintf("subscription cleaned up: %v", closeList), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) } // RemoteSubRoutine subscribes to a remote server @@ -206,7 +209,12 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) { c, _, err := dialer.Dial(u.String(), nil) if err != nil { - log.Printf("[remote websocket] fail to dial: %v", err) + slog.Error( + fmt.Sprintf("fail to dial: %v", err), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) + delete(m.remoteConns, domain) return } @@ -221,17 +229,29 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) { c.Close() } delete(m.remoteConns, domain) - log.Printf("[remote ws.reader] remote connection closed: %s", domain) + slog.Info( + fmt.Sprintf("remote connection closed: %s", domain), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) }() for { // check if the connection is still alive if c == nil { - log.Printf("connection is nil (domain: %s)", domain) + slog.Info( + fmt.Sprintf("connection is nil (domain: %s)", domain), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) break } _, message, err := c.ReadMessage() if err != nil { - log.Printf("fail to read message: %v", err) + slog.Error( + fmt.Sprintf("fail to read message: %v", err), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) break } messageChan <- message @@ -247,7 +267,11 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) { } pingTicker.Stop() delete(m.remoteConns, domain) - log.Printf("[remote ws.publisher] remote connection closed: %s", domain) + slog.Info( + fmt.Sprintf("remote connection closed: %s", domain), + slog.String("module", "socket"), + slog.String("group", "remote ws.publisher"), + ) }() var lastPong time.Time = time.Now() @@ -260,26 +284,45 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) { select { case message := <-messageChan: - log.Printf("[remote] <- %s\n", message[:64]) + slog.Debug( + fmt.Sprintf("remote message received: %s", message[:64]), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) var event core.Event err = json.Unmarshal(message, &event) if err != nil { - log.Printf("fail to Unmarshall redis message: %v", err) + slog.Error( + fmt.Sprintf("fail to Unmarshall redis message"), + slog.String("error", err.Error()), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) continue } // publish message to Redis err = m.rdb.Publish(ctx, event.Stream, string(message)).Err() if err != nil { - log.Printf("fail to publish message to Redis: %v", err) + slog.Error( + fmt.Sprintf("fail to publish message to Redis"), + slog.String("error", err.Error()), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) continue } // update cache json, err := json.Marshal(event.Item) if err != nil { - log.Printf("fail to Marshall item: %v", err) + slog.Error( + "fail to Marshall item", + slog.String("error", err.Error()), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) continue } json = append(json, ',') @@ -302,22 +345,39 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) { // 例えば、今までのキャッシュを(現時点では取得不能)最新のitrが指すようにして // 今までのキャッシュを更新し続けるとか... (TODO) // cacheKey := "stream:body:all:" + event.Item.StreamID + ":" + core.Time2Chunk(event.Item.CDate) - log.Printf("[remote] no need to update cache: %s", itr) + slog.Info( + fmt.Sprintf("no need to update cache: %s", itr), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) continue } err = m.mc.Append(&memcache.Item{Key: cacheKey, Value: json}) if err != nil { - log.Printf("fail to update cache: %v", err) + slog.Error( + fmt.Sprintf("fail to update cache: %s", itr), + slog.String("error", err.Error()), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) } case <-pingTicker.C: if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - log.Printf("fail to send ping message: %v", err) + slog.Error( + fmt.Sprintf("fail to send ping message: %v", err), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) return } if lastPong.Before(time.Now().Add(-disconnectTimeout)) { - log.Printf("pong timeout: %s", domain) + slog.Warn( + fmt.Sprintf("pong timeout: %s", domain), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) return } } @@ -329,11 +389,21 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) { } err := m.remoteConns[domain].WriteJSON(request) if err != nil { - log.Printf("[remote] fail to send subscribe request to remote server %v: %v", domain, err) + slog.Error( + fmt.Sprintf("fail to send subscribe request to remote server %v", domain), + slog.String("error", err.Error()), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) + delete(m.remoteConns, domain) return } - log.Printf("[remote] connection updated: %s > %s", domain, streams) + slog.Info( + fmt.Sprintf("remote connection updated: %s > %s", domain, streams), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) } // ConnectionKeeperRoutine @@ -348,13 +418,17 @@ func (m *manager) connectionKeeperRoutine() { case <-ticker.C: slog.InfoContext( ctx, - fmt.Sprintf("connection keeper: %d/%d\n", len(m.remoteSubs), len(m.remoteConns)), + fmt.Sprintf("connection keeper: %d/%d", len(m.remoteSubs), len(m.remoteConns)), slog.String("module", "socket"), slog.String("group", "remote"), ) for domain := range m.remoteSubs { if _, ok := m.remoteConns[domain]; !ok { - log.Printf("[remote] broken connection found: %s\n", domain) + slog.Info( + fmt.Sprintf("broken connection found: %s", domain), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) m.RemoteSubRoutine(domain, m.remoteSubs[domain]) } } @@ -383,7 +457,11 @@ func (m *manager) chunkUpdaterRoutine() { continue } - log.Printf("[manager] update chunks: %s -> %s", currentChunk, newChunk) + slog.Info( + fmt.Sprintf("update chunks: %s -> %s", currentChunk, newChunk), + slog.String("module", "socket"), + slog.String("group", "remote"), + ) m.deleteExcessiveSubs() //m.updateChunks(newChunk) diff --git a/x/stream/repository.go b/x/stream/repository.go index de8e0b3a..74cff1a4 100644 --- a/x/stream/repository.go +++ b/x/stream/repository.go @@ -5,7 +5,7 @@ import ( "encoding/json" "fmt" "io" - "log" + "log/slog" "net/http" "strings" "time" @@ -73,7 +73,11 @@ func (r *repository) PublishEvent(ctx context.Context, event core.Event) error { err := r.rdb.Publish(context.Background(), event.Stream, jsonstr).Err() if err != nil { span.RecordError(err) - log.Printf("fail to publish message to Redis: %v", err) + slog.ErrorContext( + ctx, "fail to publish message to Redis", + slog.String("error", err.Error()), + slog.String("module", "stream"), + ) } return nil @@ -123,7 +127,11 @@ func (r *repository) GetChunksFromRemote(ctx context.Context, host string, strea err = r.SaveToCache(ctx, cacheChunks, queryTime) if err != nil { - log.Printf("Error: %v", err) + slog.ErrorContext( + ctx, "fail to save cache", + slog.String("error", err.Error()), + slog.String("module", "stream"), + ) span.RecordError(err) return nil, err } diff --git a/x/stream/service.go b/x/stream/service.go index c4a72ae4..b2613479 100644 --- a/x/stream/service.go +++ b/x/stream/service.go @@ -7,7 +7,7 @@ import ( "context" "encoding/json" "fmt" - "log" + "log/slog" "net/http" "sort" "strconv" @@ -97,7 +97,7 @@ func (s *service) GetChunks(ctx context.Context, streams []string, until time.Ti untilChunk := core.Time2Chunk(until) items, err := s.repository.GetChunksFromCache(ctx, streams, untilChunk) if err != nil { - log.Printf("Error: %v", err) + slog.ErrorContext(ctx, "failed to get chunks from cache", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) return nil, err } @@ -114,7 +114,7 @@ func (s *service) GetChunks(ctx context.Context, streams []string, until time.Ti // get from db dbItems, err := s.repository.GetChunksFromDB(ctx, missingStreams, untilChunk) if err != nil { - log.Printf("Error: %v", err) + slog.ErrorContext(ctx, "failed to get chunks from db", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) return nil, err } @@ -148,7 +148,7 @@ func (s *service) GetRecentItems(ctx context.Context, streams []string, until ti untilChunk := core.Time2Chunk(until) items, err := s.repository.GetChunksFromCache(ctx, streams, untilChunk) if err != nil { - log.Printf("Error: %v", err) + slog.ErrorContext(ctx, "failed to get chunks from cache", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) return nil, err } @@ -169,7 +169,7 @@ func (s *service) GetRecentItems(ctx context.Context, streams []string, until ti if host == s.config.Concurrent.FQDN { chunks, err := s.repository.GetChunksFromDB(ctx, streams, untilChunk) if err != nil { - log.Printf("Error: %v", err) + slog.ErrorContext(ctx, "failed to get chunks from db", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) return nil, err } @@ -179,7 +179,7 @@ func (s *service) GetRecentItems(ctx context.Context, streams []string, until ti } else { chunks, err := s.repository.GetChunksFromRemote(ctx, host, streams, until) if err != nil { - log.Printf("Error: %v", err) + slog.ErrorContext(ctx, "failed to get chunks from remote", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) continue } @@ -295,18 +295,26 @@ func (s *service) PostItem(ctx context.Context, stream string, item core.StreamI author = item.Owner } if !s.repository.HasWriteAccess(ctx, streamID, author) { - span.RecordError(fmt.Errorf("You don't have write access to %v", streamID)) - log.Printf("You don't have write access to %v", streamID) + slog.InfoContext( + ctx, "failed to post to stream", + slog.String("type", "audit"), + slog.String("principal", author), + slog.String("stream", streamID), + slog.String("module", "stream"), + ) return fmt.Errorf("You don't have write access to %v", streamID) } - log.Printf("[socket] post to local stream: %v to %v", item.ObjectID, streamID) + slog.DebugContext( + ctx, fmt.Sprintf("post to local stream: %v to %v", item.ObjectID, streamID), + slog.String("module", "stream"), + ) // add to stream created, err := s.repository.CreateItem(ctx, item) if err != nil { + slog.ErrorContext(ctx, "failed to create item", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) - log.Printf("fail to create item: %v", err) return err } @@ -321,13 +329,16 @@ func (s *service) PostItem(ctx context.Context, stream string, item core.StreamI err = s.repository.PublishEvent(ctx, event) if err != nil { + slog.ErrorContext(ctx, "failed to publish event", slog.String("error", err.Error()), slog.String("module", "stream")) span.RecordError(err) - log.Printf("fail to publish message to Redis: %v", err) return err } } else { - log.Printf("[socket] post to remote stream: %v to %v@%v", item.ObjectID, streamID, streamHost) + slog.DebugContext( + ctx, fmt.Sprintf("post to remote stream: %v to %v@%v", item.ObjectID, streamID, streamHost), + slog.String("module", "stream"), + ) packet := checkpointPacket{ Stream: stream,