From a43e931a58ff81fbccfd47dedd0704761bb80a83 Mon Sep 17 00:00:00 2001 From: Antoine Grondin Date: Mon, 28 Oct 2024 18:27:45 +0900 Subject: [PATCH] chore(localstorage): extract storage interface, make registerable --- cmd/humanlog/localhost.go | 13 ++++- cmd/humanlog/main.go | 6 +-- cmd/humanlog/query.go | 8 +-- internal/localstorage/queryable.go | 27 ---------- internal/localsvc/svc.go | 14 ++++-- internal/localsvc/svc_test.go | 4 +- .../{localstorage => memstorage}/memory.go | 31 ++++++++---- internal/memstorage/memory_test.go | 15 ++++++ internal/pkg/config/config.go | 12 +++-- pkg/localstorage/queryable.go | 49 +++++++++++++++++++ .../localstorage/test_suite.go | 11 +++-- 11 files changed, 131 insertions(+), 59 deletions(-) delete mode 100644 internal/localstorage/queryable.go rename internal/{localstorage => memstorage}/memory.go (91%) create mode 100644 internal/memstorage/memory_test.go create mode 100644 pkg/localstorage/queryable.go rename internal/localstorage/memory_test.go => pkg/localstorage/test_suite.go (97%) diff --git a/cmd/humanlog/localhost.go b/cmd/humanlog/localhost.go index 0ea2c3ca..23d4a396 100644 --- a/cmd/humanlog/localhost.go +++ b/cmd/humanlog/localhost.go @@ -20,10 +20,10 @@ import ( "github.com/humanlogio/api/go/svc/localhost/v1/localhostv1connect" "github.com/humanlogio/api/go/svc/query/v1/queryv1connect" typesv1 "github.com/humanlogio/api/go/types/v1" - "github.com/humanlogio/humanlog/internal/localstorage" "github.com/humanlogio/humanlog/internal/localsvc" "github.com/humanlogio/humanlog/internal/pkg/config" "github.com/humanlogio/humanlog/internal/pkg/state" + "github.com/humanlogio/humanlog/pkg/localstorage" "github.com/humanlogio/humanlog/pkg/sink" "github.com/humanlogio/humanlog/pkg/sink/logsvcsink" "github.com/rs/cors" @@ -89,7 +89,16 @@ func startLocalhostServer( return localhostSink.Close(ctx) }, nil } - storage := localstorage.NewMemStorage(ll.WithGroup("memstorage")) + + storage, err := localstorage.Open( + ctx, + cfg.ExperimentalFeatures.ServeLocalhost.Engine, + ll.WithGroup("storage"), + cfg.ExperimentalFeatures.ServeLocalhost.Cfg, + ) + if err != nil { + return nil, nil, fmt.Errorf("opening localstorage %q: %v", cfg.ExperimentalFeatures.ServeLocalhost.Engine, err) + } ownSink, _, err := storage.SinkFor(ctx, int64(machineID), time.Now().UnixNano()) if err != nil { return nil, nil, fmt.Errorf("can't create own sink: %v", err) diff --git a/cmd/humanlog/main.go b/cmd/humanlog/main.go index b8ed3093..c8843b46 100644 --- a/cmd/humanlog/main.go +++ b/cmd/humanlog/main.go @@ -517,8 +517,8 @@ func newApp() *cli.App { sink = teesink.NewTeeSink(sink, remotesink) } - if cfg.ExperimentalFeatures.ServeLocalhostOnPort != nil { - port := *cfg.ExperimentalFeatures.ServeLocalhostOnPort + if cfg.ExperimentalFeatures.ServeLocalhost != nil { + localhostCfg := *cfg.ExperimentalFeatures.ServeLocalhost state := getState(cctx) // TODO(antoine): all logs to a single location, right now there's code logging // randomly everywhere @@ -534,7 +534,7 @@ func newApp() *cli.App { } } machineID = uint64(*state.MachineID) - localhostSink, done, err := startLocalhostServer(ctx, ll, cfg, state, machineID, port, getLocalhostHTTPClient(cctx), version) + localhostSink, done, err := startLocalhostServer(ctx, ll, cfg, state, machineID, localhostCfg.Port, getLocalhostHTTPClient(cctx), version) if err != nil { loginfo("starting experimental localhost service: %v", err) } else { diff --git a/cmd/humanlog/query.go b/cmd/humanlog/query.go index 98b43979..aa76c90c 100644 --- a/cmd/humanlog/query.go +++ b/cmd/humanlog/query.go @@ -147,10 +147,10 @@ func queryApiSummarizeCmd( queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL, clOpts) } else { cfg := getCfg(cctx) - if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhostOnPort == nil { + if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhost == nil { return fmt.Errorf("localhost feature is not enabled or not configured, can't dial localhost") } - apiURL := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort) + apiURL := fmt.Sprintf("http://localhost:%d", cfg.ExperimentalFeatures.ServeLocalhost.Port) httpClient := getHTTPClient(cctx, apiURL) queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL) } @@ -296,10 +296,10 @@ func queryApiWatchCmd( queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL, clOpts) } else { cfg := getCfg(cctx) - if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhostOnPort == nil { + if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhost == nil { return fmt.Errorf("localhost feature is not enabled or not configured, can't dial localhost") } - apiURL := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort) + apiURL := fmt.Sprintf("http://localhost:%d", cfg.ExperimentalFeatures.ServeLocalhost.Port) httpClient := getHTTPClient(cctx, apiURL) queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL) } diff --git a/internal/localstorage/queryable.go b/internal/localstorage/queryable.go deleted file mode 100644 index adbe8209..00000000 --- a/internal/localstorage/queryable.go +++ /dev/null @@ -1,27 +0,0 @@ -package localstorage - -import ( - "context" - "time" - - typesv1 "github.com/humanlogio/api/go/types/v1" - "github.com/humanlogio/humanlog/pkg/sink" -) - -type Storage interface { - Queryable - SinkFor(ctx context.Context, machineID, sessionID int64) (_ sink.Sink, heartbeatIn time.Duration, _ error) - Heartbeat(ctx context.Context, machineID, sessionID int64) (time.Duration, error) -} - -type Queryable interface { - Query(context.Context, *typesv1.LogQuery) (<-chan Cursor, error) -} - -type Cursor interface { - IDs() (machineID, sessionID int64) - Next(context.Context) bool - Event() *typesv1.LogEvent - Err() error - Close() error -} diff --git a/internal/localsvc/svc.go b/internal/localsvc/svc.go index 9eb300e7..7564c932 100644 --- a/internal/localsvc/svc.go +++ b/internal/localsvc/svc.go @@ -15,8 +15,8 @@ import ( qrv1 "github.com/humanlogio/api/go/svc/query/v1" qrsvcpb "github.com/humanlogio/api/go/svc/query/v1/queryv1connect" typesv1 "github.com/humanlogio/api/go/types/v1" - "github.com/humanlogio/humanlog/internal/localstorage" "github.com/humanlogio/humanlog/internal/pkg/state" + "github.com/humanlogio/humanlog/pkg/localstorage" "github.com/humanlogio/humanlog/pkg/sink" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/durationpb" @@ -208,7 +208,11 @@ func (svc *Service) SummarizeEvents(ctx context.Context, req *connect.Request[qr for cursor := range cursors { for cursor.Next(ctx) { - ts := cursor.Event().ParsedAt.AsTime().Truncate(width) + ev := new(typesv1.LogEvent) + if err := cursor.Event(ev); err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("scanning events: %v", err)) + } + ts := ev.ParsedAt.AsTime().Truncate(width) loc, _ := slices.BinarySearchFunc(buckets, ts, func(a bucket, t time.Time) int { return a.ts.Compare(t) }) @@ -297,7 +301,11 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa slog.Int64("session_id", sessionID), ) for cursor.Next(ctx) { - evs = append(evs, cursor.Event()) + ev := new(typesv1.LogEvent) + if err := cursor.Event(ev); err != nil { + return err + } + evs = append(evs, ev) now := time.Now() if now.Sub(lastSend) > 100*time.Millisecond { ll.DebugContext(ctx, "cursor batch sent", slog.Int("batch_len", len(evs))) diff --git a/internal/localsvc/svc_test.go b/internal/localsvc/svc_test.go index a7ee1e68..df31dee7 100644 --- a/internal/localsvc/svc_test.go +++ b/internal/localsvc/svc_test.go @@ -10,7 +10,7 @@ import ( "connectrpc.com/connect" qrv1 "github.com/humanlogio/api/go/svc/query/v1" typesv1 "github.com/humanlogio/api/go/types/v1" - "github.com/humanlogio/humanlog/internal/localstorage" + "github.com/humanlogio/humanlog/internal/memstorage" "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/durationpb" @@ -120,7 +120,7 @@ func TestSummarize(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ll := slog.New(slog.NewTextHandler(os.Stderr, nil)) - mem := localstorage.NewMemStorage(ll) + mem := memstorage.NewMemStorage(ll) for _, leg := range tt.input { snk, _, err := mem.SinkFor(ctx, leg.MachineId, leg.SessionId) diff --git a/internal/localstorage/memory.go b/internal/memstorage/memory.go similarity index 91% rename from internal/localstorage/memory.go rename to internal/memstorage/memory.go index 3629419e..fc46c638 100644 --- a/internal/localstorage/memory.go +++ b/internal/memstorage/memory.go @@ -1,4 +1,4 @@ -package localstorage +package memstorage import ( "context" @@ -9,18 +9,25 @@ import ( "time" typesv1 "github.com/humanlogio/api/go/types/v1" + "github.com/humanlogio/humanlog/pkg/localstorage" "github.com/humanlogio/humanlog/pkg/sink" "github.com/teivah/broadcast" "google.golang.org/protobuf/proto" ) var ( - _ Queryable = (*MemStorage)(nil) - _ Storage = (*MemStorage)(nil) - _ sink.Sink = (*MemStorageSink)(nil) - _ sink.BatchSink = (*MemStorageSink)(nil) + _ localstorage.Queryable = (*MemStorage)(nil) + _ localstorage.Storage = (*MemStorage)(nil) + _ sink.Sink = (*MemStorageSink)(nil) + _ sink.BatchSink = (*MemStorageSink)(nil) ) +func init() { + localstorage.RegisterStorage("basic", func(ctx context.Context, ll *slog.Logger, cfg map[string]interface{}) (localstorage.Storage, error) { + return NewMemStorage(ll), nil + }) +} + type MemStorage struct { ll *slog.Logger heartbeat time.Duration @@ -46,7 +53,7 @@ type SummarizedEvents struct { } } -func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan Cursor, error) { +func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan localstorage.Cursor, error) { if q.To != nil && q.From.AsTime().After(q.To.AsTime()) { return nil, fmt.Errorf("invalid query, `to` is before `from`") } @@ -54,7 +61,7 @@ func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan C str.sinksMu.Lock() defer str.sinksMu.Unlock() - var cursors []Cursor + var cursors []localstorage.Cursor for _, snk := range str.sinks { if idx, ok, err := snk.firstMatch(ctx, q); err != nil { return nil, err @@ -64,7 +71,7 @@ func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan C cursors = append(cursors, newMemSinkCursor(ll, q, idx, idx, true, snk)) } } - newCursors := make(chan Cursor, len(cursors)) + newCursors := make(chan localstorage.Cursor, len(cursors)) for _, cursor := range cursors { newCursors <- cursor } @@ -165,8 +172,12 @@ func (crs *MemSinkCursor) Next(ctx context.Context) bool { return hasCurrent && crs.err == nil } -func (crs *MemSinkCursor) Event() *typesv1.LogEvent { - return crs.sink.evs[crs.cur] +func (crs *MemSinkCursor) Event(ev *typesv1.LogEvent) error { + orig := crs.sink.evs[crs.cur] + ev.ParsedAt = orig.ParsedAt + ev.Raw = orig.Raw + ev.Structured = orig.Structured + return nil } func (crs *MemSinkCursor) Err() error { diff --git a/internal/memstorage/memory_test.go b/internal/memstorage/memory_test.go new file mode 100644 index 00000000..3eb0294f --- /dev/null +++ b/internal/memstorage/memory_test.go @@ -0,0 +1,15 @@ +package memstorage + +import ( + "log/slog" + "os" + "testing" + + "github.com/humanlogio/humanlog/pkg/localstorage" +) + +func TestMemoryStorage(t *testing.T) { + localstorage.RunTest(t, func(t *testing.T) localstorage.Storage { + return NewMemStorage(slog.New(slog.NewTextHandler(os.Stderr, nil))) + }) +} diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index d85d45b4..d4d18c86 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -98,9 +98,15 @@ type Config struct { } type Features struct { - ReleaseChannel *string `json:"release_channel"` - SendLogsToCloud *bool `json:"send_logs_to_cloud"` - ServeLocalhostOnPort *int `json:"serve_localhost_on_port"` + ReleaseChannel *string `json:"release_channel"` + SendLogsToCloud *bool `json:"send_logs_to_cloud"` + ServeLocalhost *ServeLocalhost `json:"serve_localhost"` +} + +type ServeLocalhost struct { + Port int `json:"port"` + Engine string `json:"engine"` + Cfg map[string]interface{} `json:"engine_config"` } func (cfg Config) populateEmpty(other *Config) *Config { diff --git a/pkg/localstorage/queryable.go b/pkg/localstorage/queryable.go new file mode 100644 index 00000000..e4f44cd5 --- /dev/null +++ b/pkg/localstorage/queryable.go @@ -0,0 +1,49 @@ +package localstorage + +import ( + "context" + "fmt" + "log/slog" + "time" + + typesv1 "github.com/humanlogio/api/go/types/v1" + "github.com/humanlogio/humanlog/pkg/sink" +) + +type StorageBuilder func(ctx context.Context, ll *slog.Logger, cfg map[string]interface{}) (Storage, error) + +var registry = make(map[string]StorageBuilder) + +func RegisterStorage(name string, builder StorageBuilder) { + _, ok := registry[name] + if ok { + panic(fmt.Sprintf("already used: %q", name)) + } + registry[name] = builder +} + +func Open(ctx context.Context, name string, ll *slog.Logger, cfg map[string]interface{}) (Storage, error) { + builder, ok := registry[name] + if !ok { + return nil, fmt.Errorf("no storage engine with name %q", name) + } + return builder(ctx, ll, cfg) +} + +type Storage interface { + Queryable + SinkFor(ctx context.Context, machineID, sessionID int64) (_ sink.Sink, heartbeatIn time.Duration, _ error) + Heartbeat(ctx context.Context, machineID, sessionID int64) (time.Duration, error) +} + +type Queryable interface { + Query(context.Context, *typesv1.LogQuery) (<-chan Cursor, error) +} + +type Cursor interface { + IDs() (machineID, sessionID int64) + Next(context.Context) bool + Event(*typesv1.LogEvent) error + Err() error + Close() error +} diff --git a/internal/localstorage/memory_test.go b/pkg/localstorage/test_suite.go similarity index 97% rename from internal/localstorage/memory_test.go rename to pkg/localstorage/test_suite.go index 6d03afc0..dacf1cde 100644 --- a/internal/localstorage/memory_test.go +++ b/pkg/localstorage/test_suite.go @@ -2,8 +2,6 @@ package localstorage import ( "context" - "log/slog" - "os" "testing" "time" @@ -13,7 +11,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func TestMemoryStorage(t *testing.T) { +func RunTest(t *testing.T, constructor func(t *testing.T) Storage) { tests := []struct { name string q *typesv1.LogQuery @@ -183,7 +181,7 @@ func TestMemoryStorage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mem := NewMemStorage(slog.New(slog.NewTextHandler(os.Stderr, nil))) + mem := constructor(t) for _, leg := range tt.input { snk, _, err := mem.SinkFor(ctx, leg.MachineId, leg.SessionId) @@ -228,7 +226,10 @@ func drainCursors(t *testing.T, ctx context.Context, cursors <-chan Cursor) []*t MachineId: mid, SessionId: sid, } for cursor.Next(ctx) { - leg.Logs = append(leg.Logs, cursor.Event()) + ev := new(typesv1.LogEvent) + err := cursor.Event(ev) + require.NoError(t, err) + leg.Logs = append(leg.Logs, ev) } require.NoError(t, cursor.Err()) out = append(out, leg)