Skip to content

Commit

Permalink
chore(localstorage): extract storage interface, make registerable
Browse files Browse the repository at this point in the history
  • Loading branch information
aybabtme committed Oct 28, 2024
1 parent 01663db commit a43e931
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 59 deletions.
13 changes: 11 additions & 2 deletions cmd/humanlog/localhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"github.com/humanlogio/api/go/svc/localhost/v1/localhostv1connect"
"github.com/humanlogio/api/go/svc/query/v1/queryv1connect"
typesv1 "github.com/humanlogio/api/go/types/v1"
"github.com/humanlogio/humanlog/internal/localstorage"
"github.com/humanlogio/humanlog/internal/localsvc"
"github.com/humanlogio/humanlog/internal/pkg/config"
"github.com/humanlogio/humanlog/internal/pkg/state"
"github.com/humanlogio/humanlog/pkg/localstorage"
"github.com/humanlogio/humanlog/pkg/sink"
"github.com/humanlogio/humanlog/pkg/sink/logsvcsink"
"github.com/rs/cors"
Expand Down Expand Up @@ -89,7 +89,16 @@ func startLocalhostServer(
return localhostSink.Close(ctx)
}, nil
}
storage := localstorage.NewMemStorage(ll.WithGroup("memstorage"))

storage, err := localstorage.Open(
ctx,
cfg.ExperimentalFeatures.ServeLocalhost.Engine,
ll.WithGroup("storage"),
cfg.ExperimentalFeatures.ServeLocalhost.Cfg,
)
if err != nil {
return nil, nil, fmt.Errorf("opening localstorage %q: %v", cfg.ExperimentalFeatures.ServeLocalhost.Engine, err)
}
ownSink, _, err := storage.SinkFor(ctx, int64(machineID), time.Now().UnixNano())
if err != nil {
return nil, nil, fmt.Errorf("can't create own sink: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions cmd/humanlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ func newApp() *cli.App {
sink = teesink.NewTeeSink(sink, remotesink)
}

if cfg.ExperimentalFeatures.ServeLocalhostOnPort != nil {
port := *cfg.ExperimentalFeatures.ServeLocalhostOnPort
if cfg.ExperimentalFeatures.ServeLocalhost != nil {
localhostCfg := *cfg.ExperimentalFeatures.ServeLocalhost
state := getState(cctx)
// TODO(antoine): all logs to a single location, right now there's code logging
// randomly everywhere
Expand All @@ -534,7 +534,7 @@ func newApp() *cli.App {
}
}
machineID = uint64(*state.MachineID)
localhostSink, done, err := startLocalhostServer(ctx, ll, cfg, state, machineID, port, getLocalhostHTTPClient(cctx), version)
localhostSink, done, err := startLocalhostServer(ctx, ll, cfg, state, machineID, localhostCfg.Port, getLocalhostHTTPClient(cctx), version)
if err != nil {
loginfo("starting experimental localhost service: %v", err)
} else {
Expand Down
8 changes: 4 additions & 4 deletions cmd/humanlog/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ func queryApiSummarizeCmd(
queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL, clOpts)
} else {
cfg := getCfg(cctx)
if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhostOnPort == nil {
if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhost == nil {
return fmt.Errorf("localhost feature is not enabled or not configured, can't dial localhost")
}
apiURL := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort)
apiURL := fmt.Sprintf("http://localhost:%d", cfg.ExperimentalFeatures.ServeLocalhost.Port)
httpClient := getHTTPClient(cctx, apiURL)
queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL)
}
Expand Down Expand Up @@ -296,10 +296,10 @@ func queryApiWatchCmd(
queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL, clOpts)
} else {
cfg := getCfg(cctx)
if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhostOnPort == nil {
if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhost == nil {
return fmt.Errorf("localhost feature is not enabled or not configured, can't dial localhost")
}
apiURL := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort)
apiURL := fmt.Sprintf("http://localhost:%d", cfg.ExperimentalFeatures.ServeLocalhost.Port)
httpClient := getHTTPClient(cctx, apiURL)
queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL)
}
Expand Down
27 changes: 0 additions & 27 deletions internal/localstorage/queryable.go

This file was deleted.

14 changes: 11 additions & 3 deletions internal/localsvc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
qrv1 "github.com/humanlogio/api/go/svc/query/v1"
qrsvcpb "github.com/humanlogio/api/go/svc/query/v1/queryv1connect"
typesv1 "github.com/humanlogio/api/go/types/v1"
"github.com/humanlogio/humanlog/internal/localstorage"
"github.com/humanlogio/humanlog/internal/pkg/state"
"github.com/humanlogio/humanlog/pkg/localstorage"
"github.com/humanlogio/humanlog/pkg/sink"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -208,7 +208,11 @@ func (svc *Service) SummarizeEvents(ctx context.Context, req *connect.Request[qr

for cursor := range cursors {
for cursor.Next(ctx) {
ts := cursor.Event().ParsedAt.AsTime().Truncate(width)
ev := new(typesv1.LogEvent)
if err := cursor.Event(ev); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("scanning events: %v", err))
}
ts := ev.ParsedAt.AsTime().Truncate(width)
loc, _ := slices.BinarySearchFunc(buckets, ts, func(a bucket, t time.Time) int {
return a.ts.Compare(t)
})
Expand Down Expand Up @@ -297,7 +301,11 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa
slog.Int64("session_id", sessionID),
)
for cursor.Next(ctx) {
evs = append(evs, cursor.Event())
ev := new(typesv1.LogEvent)
if err := cursor.Event(ev); err != nil {
return err
}
evs = append(evs, ev)
now := time.Now()
if now.Sub(lastSend) > 100*time.Millisecond {
ll.DebugContext(ctx, "cursor batch sent", slog.Int("batch_len", len(evs)))
Expand Down
4 changes: 2 additions & 2 deletions internal/localsvc/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"connectrpc.com/connect"
qrv1 "github.com/humanlogio/api/go/svc/query/v1"
typesv1 "github.com/humanlogio/api/go/types/v1"
"github.com/humanlogio/humanlog/internal/localstorage"
"github.com/humanlogio/humanlog/internal/memstorage"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestSummarize(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {

ll := slog.New(slog.NewTextHandler(os.Stderr, nil))
mem := localstorage.NewMemStorage(ll)
mem := memstorage.NewMemStorage(ll)

for _, leg := range tt.input {
snk, _, err := mem.SinkFor(ctx, leg.MachineId, leg.SessionId)
Expand Down
31 changes: 21 additions & 10 deletions internal/localstorage/memory.go → internal/memstorage/memory.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package localstorage
package memstorage

import (
"context"
Expand All @@ -9,18 +9,25 @@ import (
"time"

typesv1 "github.com/humanlogio/api/go/types/v1"
"github.com/humanlogio/humanlog/pkg/localstorage"
"github.com/humanlogio/humanlog/pkg/sink"
"github.com/teivah/broadcast"
"google.golang.org/protobuf/proto"
)

var (
_ Queryable = (*MemStorage)(nil)
_ Storage = (*MemStorage)(nil)
_ sink.Sink = (*MemStorageSink)(nil)
_ sink.BatchSink = (*MemStorageSink)(nil)
_ localstorage.Queryable = (*MemStorage)(nil)
_ localstorage.Storage = (*MemStorage)(nil)
_ sink.Sink = (*MemStorageSink)(nil)
_ sink.BatchSink = (*MemStorageSink)(nil)
)

func init() {
localstorage.RegisterStorage("basic", func(ctx context.Context, ll *slog.Logger, cfg map[string]interface{}) (localstorage.Storage, error) {
return NewMemStorage(ll), nil
})
}

type MemStorage struct {
ll *slog.Logger
heartbeat time.Duration
Expand All @@ -46,15 +53,15 @@ type SummarizedEvents struct {
}
}

func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan Cursor, error) {
func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan localstorage.Cursor, error) {
if q.To != nil && q.From.AsTime().After(q.To.AsTime()) {
return nil, fmt.Errorf("invalid query, `to` is before `from`")
}

str.sinksMu.Lock()
defer str.sinksMu.Unlock()

var cursors []Cursor
var cursors []localstorage.Cursor
for _, snk := range str.sinks {
if idx, ok, err := snk.firstMatch(ctx, q); err != nil {
return nil, err
Expand All @@ -64,7 +71,7 @@ func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan C
cursors = append(cursors, newMemSinkCursor(ll, q, idx, idx, true, snk))
}
}
newCursors := make(chan Cursor, len(cursors))
newCursors := make(chan localstorage.Cursor, len(cursors))
for _, cursor := range cursors {
newCursors <- cursor
}
Expand Down Expand Up @@ -165,8 +172,12 @@ func (crs *MemSinkCursor) Next(ctx context.Context) bool {
return hasCurrent && crs.err == nil
}

func (crs *MemSinkCursor) Event() *typesv1.LogEvent {
return crs.sink.evs[crs.cur]
func (crs *MemSinkCursor) Event(ev *typesv1.LogEvent) error {
orig := crs.sink.evs[crs.cur]
ev.ParsedAt = orig.ParsedAt
ev.Raw = orig.Raw
ev.Structured = orig.Structured
return nil
}

func (crs *MemSinkCursor) Err() error {
Expand Down
15 changes: 15 additions & 0 deletions internal/memstorage/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package memstorage

import (
"log/slog"
"os"
"testing"

"github.com/humanlogio/humanlog/pkg/localstorage"
)

func TestMemoryStorage(t *testing.T) {
localstorage.RunTest(t, func(t *testing.T) localstorage.Storage {
return NewMemStorage(slog.New(slog.NewTextHandler(os.Stderr, nil)))
})
}
12 changes: 9 additions & 3 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ type Config struct {
}

type Features struct {
ReleaseChannel *string `json:"release_channel"`
SendLogsToCloud *bool `json:"send_logs_to_cloud"`
ServeLocalhostOnPort *int `json:"serve_localhost_on_port"`
ReleaseChannel *string `json:"release_channel"`
SendLogsToCloud *bool `json:"send_logs_to_cloud"`
ServeLocalhost *ServeLocalhost `json:"serve_localhost"`
}

type ServeLocalhost struct {
Port int `json:"port"`
Engine string `json:"engine"`
Cfg map[string]interface{} `json:"engine_config"`
}

func (cfg Config) populateEmpty(other *Config) *Config {
Expand Down
49 changes: 49 additions & 0 deletions pkg/localstorage/queryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package localstorage

import (
"context"
"fmt"
"log/slog"
"time"

typesv1 "github.com/humanlogio/api/go/types/v1"
"github.com/humanlogio/humanlog/pkg/sink"
)

type StorageBuilder func(ctx context.Context, ll *slog.Logger, cfg map[string]interface{}) (Storage, error)

var registry = make(map[string]StorageBuilder)

func RegisterStorage(name string, builder StorageBuilder) {
_, ok := registry[name]
if ok {
panic(fmt.Sprintf("already used: %q", name))
}
registry[name] = builder
}

func Open(ctx context.Context, name string, ll *slog.Logger, cfg map[string]interface{}) (Storage, error) {
builder, ok := registry[name]
if !ok {
return nil, fmt.Errorf("no storage engine with name %q", name)
}
return builder(ctx, ll, cfg)
}

type Storage interface {
Queryable
SinkFor(ctx context.Context, machineID, sessionID int64) (_ sink.Sink, heartbeatIn time.Duration, _ error)
Heartbeat(ctx context.Context, machineID, sessionID int64) (time.Duration, error)
}

type Queryable interface {
Query(context.Context, *typesv1.LogQuery) (<-chan Cursor, error)
}

type Cursor interface {
IDs() (machineID, sessionID int64)
Next(context.Context) bool
Event(*typesv1.LogEvent) error
Err() error
Close() error
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package localstorage

import (
"context"
"log/slog"
"os"
"testing"
"time"

Expand All @@ -13,7 +11,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestMemoryStorage(t *testing.T) {
func RunTest(t *testing.T, constructor func(t *testing.T) Storage) {
tests := []struct {
name string
q *typesv1.LogQuery
Expand Down Expand Up @@ -183,7 +181,7 @@ func TestMemoryStorage(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

mem := NewMemStorage(slog.New(slog.NewTextHandler(os.Stderr, nil)))
mem := constructor(t)

for _, leg := range tt.input {
snk, _, err := mem.SinkFor(ctx, leg.MachineId, leg.SessionId)
Expand Down Expand Up @@ -228,7 +226,10 @@ func drainCursors(t *testing.T, ctx context.Context, cursors <-chan Cursor) []*t
MachineId: mid, SessionId: sid,
}
for cursor.Next(ctx) {
leg.Logs = append(leg.Logs, cursor.Event())
ev := new(typesv1.LogEvent)
err := cursor.Event(ev)
require.NoError(t, err)
leg.Logs = append(leg.Logs, ev)
}
require.NoError(t, cursor.Err())
out = append(out, leg)
Expand Down

0 comments on commit a43e931

Please sign in to comment.