diff --git a/auditing/auditing.go b/auditing/auditing.go index b7238ec..3898bf2 100644 --- a/auditing/auditing.go +++ b/auditing/auditing.go @@ -50,31 +50,31 @@ const ( const EntryFilterDefaultLimit int64 = 100 type Entry struct { - Id string // filled by the auditing driver - Component string - RequestId string `json:"rqid"` - Type EntryType - Timestamp time.Time + Id string `json:"-"` // filled by the auditing driver + Component string `json:"component"` + RequestId string `json:"rqid"` + Type EntryType `json:"type"` + Timestamp time.Time `json:"timestamp"` - User string - Tenant string + User string `json:"user"` + Tenant string `json:"tenant"` // For `EntryDetailHTTP` the HTTP method get, post, put, delete, ... // For `EntryDetailGRPC` unary, stream - Detail EntryDetail + Detail EntryDetail `json:"detail"` // e.g. Request, Response, Error, Opened, Close - Phase EntryPhase + Phase EntryPhase `json:"phase"` // For `EntryDetailHTTP` /api/v1/... // For `EntryDetailGRPC` /api.v1/... (the method name) - Path string - ForwardedFor string - RemoteAddr string + Path string `json:"path"` + ForwardedFor string `json:"forwardedfor"` + RemoteAddr string `json:"remoteaddr"` - Body any // JSON, string or numbers - StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code + Body any `json:"body"` // JSON, string or numbers + StatusCode int `json:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code // Internal errors - Error error + Error error `json:"error"` } func (e *Entry) prepareForNextPhase() { diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 93660b4..58fba8a 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -4,11 +4,9 @@ import ( "context" "database/sql" "encoding/json" - "errors" "fmt" "log/slog" "reflect" - "slices" "strings" "time" @@ -44,28 +42,11 @@ type ( log *slog.Logger config *TimescaleDbConfig + } - cols []string - vals []any - } - - // to keep the public interface free from field tags like "db" and "json" (as these might differ for different dbs) - // we introduce an internal type. unfortunately, this requires a conversion, which takes effort to maintain - timescaleEntry struct { - Component string `db:"component"` - RequestId string `db:"rqid" json:"rqid"` - Type EntryType `db:"type"` - Timestamp time.Time `db:"timestamp"` - User string `db:"userid"` - Tenant string `db:"tenant"` - Detail EntryDetail `db:"detail"` - Phase EntryPhase `db:"phase"` - Path string `db:"path"` - ForwardedFor string `db:"forwardedfor"` - RemoteAddr string `db:"remoteaddr"` - Body any `db:"body"` - StatusCode int `db:"statuscode"` - Error string `db:"error" json:"-"` + timescaledbRow struct { + Timestamp time.Time `db:"timestamp"` + Entry []byte `db:"entry"` } sqlCompOp string @@ -148,19 +129,7 @@ func (a *timescaleAuditing) initialize() error { { query: `CREATE TABLE IF NOT EXISTS traces ( timestamp timestamp NOT NULL, - rqid text NOT NULL, - component text NOT NULL, - type text NOT NULL, - body text NOT NULL, - error text NOT NULL, - statuscode int NOT NULL, - remoteaddr text NOT NULL, - forwardedfor text NOT NULL, - path text NOT NULL, - phase text NOT NULL, - detail text NOT NULL, - tenant text NOT NULL, - userid text NOT NULL + entry jsonb NOT NULL )`, }, { @@ -170,7 +139,6 @@ func (a *timescaleAuditing) initialize() error { { query: `ALTER TABLE traces SET ( timescaledb.compress, - timescaledb.compress_segmentby = 'tenant', timescaledb.compress_orderby = 'timestamp' )`, }, @@ -178,10 +146,9 @@ func (a *timescaleAuditing) initialize() error { query: `SELECT add_compression_policy('traces', $1::interval)`, args: []any{a.config.CompressionInterval}, }, - // TODO: evaluate what is needed - // { - // query: `CREATE INDEX IF NOT EXISTS traces_idx ON traces()`, - // }, + { + query: `CREATE INDEX IF NOT EXISTS traces_gin_idx ON traces USING GIN (entry)`, + }, { query: `SELECT add_retention_policy('traces', $1::interval)`, args: []any{a.config.Retention}, @@ -216,36 +183,6 @@ func (a *timescaleAuditing) initialize() error { return err } - q, _, err := sq. - Select("column_name"). - From("information_schema.columns"). - Where("table_name='traces'"). - ToSql() - if err != nil { - return err - } - - rows, err := a.db.Query(q) - if err != nil { - return err - } - defer rows.Close() - if rows.Err() != nil { - return rows.Err() - } - - for rows.Next() { - var col string - - err = rows.Scan(&col) - if err != nil { - return err - } - - a.cols = append(a.cols, col) - a.vals = append(a.vals, sq.Expr(":"+col)) - } - return nil } @@ -256,22 +193,27 @@ func (a *timescaleAuditing) Flush() error { func (a *timescaleAuditing) Index(entry Entry) error { q, _, err := sq. Insert("traces"). - Columns(a.cols...). - Values(a.vals...). + Columns("timestamp", "entry"). + Values(sq.Expr(":timestamp"), sq.Expr(":entry")). ToSql() if err != nil { return err } - internalEntry, err := a.toInternal(entry) + e, err := json.Marshal(entry) if err != nil { - return fmt.Errorf("unable to convert audit trace to database entry: %w", err) + return fmt.Errorf("error marshaling entry: %w", err) + } + + row := timescaledbRow{ + Timestamp: entry.Timestamp, + Entry: e, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - _, err = a.db.NamedExecContext(ctx, q, internalEntry) + _, err = a.db.NamedExecContext(ctx, q, row) if err != nil { return fmt.Errorf("unable to index audit trace: %w", err) } @@ -288,17 +230,13 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return nil } - if !slices.Contains(a.cols, field) { - return fmt.Errorf("unable to filter for %q, no such table column", field) - } - values[field] = value switch op { case equals: - where = append(where, fmt.Sprintf("%s=:%s", field, field)) + where = append(where, fmt.Sprintf("entry ->> '%s'=:%s", field, field)) case like: - where = append(where, fmt.Sprintf("%s like '%%' || %s || '%%'", field, field)) + where = append(where, fmt.Sprintf("entry ->> '%s' like '%%' || :%s || '%%'", field, field)) default: return fmt.Errorf("comp op not known") } @@ -347,13 +285,6 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return nil, err } - query := sq. - Select(a.cols...). - From("traces"). - Columns(a.cols...). - Where(strings.Join(where, " AND ")). - OrderBy("timestamp ASC") - // to make queries more efficient for timescaledb, we always provide from if filter.From.IsZero() { filter.From = time.Now().Add(-24 * time.Hour) @@ -366,6 +297,13 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E values["to"] = filter.To where = append(where, "timestamp <= :to") } + + query := sq. + Select("timestamp", "entry"). + From("traces"). + Where(strings.Join(where, " AND ")). + OrderBy("timestamp ASC") + if filter.Limit != 0 { query.Limit(uint64(filter.Limit)) } @@ -384,16 +322,17 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E var entries []Entry for rows.Next() { - var e timescaleEntry + var e timescaledbRow err = rows.StructScan(&e) if err != nil { return nil, err } - entry, err := a.toExternal(e) + var entry Entry + err = json.Unmarshal(e.Entry, &entry) if err != nil { - return nil, fmt.Errorf("unable to convert entry: %w", err) + return nil, fmt.Errorf("error unmarshaling entry: %w", err) } entries = append(entries, entry) @@ -401,41 +340,3 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return entries, nil } - -func (_ *timescaleAuditing) toInternal(e Entry) (*timescaleEntry, error) { - intermediate, err := json.Marshal(e) // nolint - if err != nil { - return nil, err - } - var internalEntry timescaleEntry - err = json.Unmarshal(intermediate, &internalEntry) // nolint - if err != nil { - return nil, err - } - - internalEntry.RequestId = e.RequestId - if e.Error != nil { - internalEntry.Error = e.Error.Error() - } - - return &internalEntry, nil -} - -func (_ *timescaleAuditing) toExternal(e timescaleEntry) (Entry, error) { - intermediate, err := json.Marshal(e) // nolint - if err != nil { - return Entry{}, err - } - var externalEntry Entry - err = json.Unmarshal(intermediate, &externalEntry) // nolint - if err != nil { - return Entry{}, err - } - - externalEntry.Id = e.RequestId - if e.Error != "" { - externalEntry.Error = errors.New(e.Error) - } - - return externalEntry, nil -} diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index c6a188d..bdf6c39 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -104,7 +104,8 @@ func TestAuditing_TimescaleDB(t *testing.T) { name: "insert one entry", t: func(t *testing.T, a Auditing) { err := a.Index(Entry{ - Body: "test", + Timestamp: now, + Body: "test", }) require.NoError(t, err) err = a.Flush() @@ -142,6 +143,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { entries, err = a.Search(ctx, EntryFilter{ Body: "This", }) + require.NoError(t, err) assert.Len(t, entries, len(es)) },