Skip to content

Commit

Permalink
Use JSONB.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 committed Aug 26, 2024
1 parent 3e1110e commit 14976a0
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 146 deletions.
30 changes: 15 additions & 15 deletions auditing/auditing.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,31 @@ const (
const EntryFilterDefaultLimit int64 = 100

type Entry struct {
Id string // filled by the auditing driver
Component string
RequestId string `json:"rqid"`
Type EntryType
Timestamp time.Time
Id string `json:"-"` // filled by the auditing driver
Component string `json:"component"`
RequestId string `json:"rqid"`
Type EntryType `json:"type"`
Timestamp time.Time `json:"timestamp"`

User string
Tenant string
User string `json:"user"`
Tenant string `json:"tenant"`

// For `EntryDetailHTTP` the HTTP method get, post, put, delete, ...
// For `EntryDetailGRPC` unary, stream
Detail EntryDetail
Detail EntryDetail `json:"detail"`
// e.g. Request, Response, Error, Opened, Close
Phase EntryPhase
Phase EntryPhase `json:"phase"`
// For `EntryDetailHTTP` /api/v1/...
// For `EntryDetailGRPC` /api.v1/... (the method name)
Path string
ForwardedFor string
RemoteAddr string
Path string `json:"path"`
ForwardedFor string `json:"forwardedfor"`
RemoteAddr string `json:"remoteaddr"`

Body any // JSON, string or numbers
StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code
Body any `json:"body"` // JSON, string or numbers
StatusCode int `json:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code

// Internal errors
Error error
Error error `json:"error"`
}

func (e *Entry) prepareForNextPhase() {
Expand Down
161 changes: 31 additions & 130 deletions auditing/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"reflect"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -44,28 +42,11 @@ type (
log *slog.Logger

config *TimescaleDbConfig
}

cols []string
vals []any
}

// to keep the public interface free from field tags like "db" and "json" (as these might differ for different dbs)
// we introduce an internal type. unfortunately, this requires a conversion, which takes effort to maintain
timescaleEntry struct {
Component string `db:"component"`
RequestId string `db:"rqid" json:"rqid"`
Type EntryType `db:"type"`
Timestamp time.Time `db:"timestamp"`
User string `db:"userid"`
Tenant string `db:"tenant"`
Detail EntryDetail `db:"detail"`
Phase EntryPhase `db:"phase"`
Path string `db:"path"`
ForwardedFor string `db:"forwardedfor"`
RemoteAddr string `db:"remoteaddr"`
Body any `db:"body"`
StatusCode int `db:"statuscode"`
Error string `db:"error" json:"-"`
timescaledbRow struct {
Timestamp time.Time `db:"timestamp"`
Entry []byte `db:"entry"`
}

sqlCompOp string
Expand Down Expand Up @@ -148,19 +129,7 @@ func (a *timescaleAuditing) initialize() error {
{
query: `CREATE TABLE IF NOT EXISTS traces (
timestamp timestamp NOT NULL,
rqid text NOT NULL,
component text NOT NULL,
type text NOT NULL,
body text NOT NULL,
error text NOT NULL,
statuscode int NOT NULL,
remoteaddr text NOT NULL,
forwardedfor text NOT NULL,
path text NOT NULL,
phase text NOT NULL,
detail text NOT NULL,
tenant text NOT NULL,
userid text NOT NULL
entry jsonb NOT NULL
)`,
},
{
Expand All @@ -170,18 +139,16 @@ func (a *timescaleAuditing) initialize() error {
{
query: `ALTER TABLE traces SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'tenant',
timescaledb.compress_orderby = 'timestamp'
)`,
},
{
query: `SELECT add_compression_policy('traces', $1::interval)`,
args: []any{a.config.CompressionInterval},
},
// TODO: evaluate what is needed
// {
// query: `CREATE INDEX IF NOT EXISTS traces_idx ON traces()`,
// },
{
query: `CREATE INDEX IF NOT EXISTS traces_gin_idx ON traces USING GIN (entry)`,
},
{
query: `SELECT add_retention_policy('traces', $1::interval)`,
args: []any{a.config.Retention},
Expand Down Expand Up @@ -216,36 +183,6 @@ func (a *timescaleAuditing) initialize() error {
return err
}

q, _, err := sq.
Select("column_name").
From("information_schema.columns").
Where("table_name='traces'").
ToSql()
if err != nil {
return err
}

rows, err := a.db.Query(q)
if err != nil {
return err
}
defer rows.Close()
if rows.Err() != nil {
return rows.Err()
}

for rows.Next() {
var col string

err = rows.Scan(&col)
if err != nil {
return err
}

a.cols = append(a.cols, col)
a.vals = append(a.vals, sq.Expr(":"+col))
}

return nil
}

Expand All @@ -256,22 +193,27 @@ func (a *timescaleAuditing) Flush() error {
func (a *timescaleAuditing) Index(entry Entry) error {
q, _, err := sq.
Insert("traces").
Columns(a.cols...).
Values(a.vals...).
Columns("timestamp", "entry").
Values(sq.Expr(":timestamp"), sq.Expr(":entry")).
ToSql()
if err != nil {
return err
}

internalEntry, err := a.toInternal(entry)
e, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("unable to convert audit trace to database entry: %w", err)
return fmt.Errorf("error marshaling entry: %w", err)
}

row := timescaledbRow{
Timestamp: entry.Timestamp,
Entry: e,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

_, err = a.db.NamedExecContext(ctx, q, internalEntry)
_, err = a.db.NamedExecContext(ctx, q, row)
if err != nil {
return fmt.Errorf("unable to index audit trace: %w", err)
}
Expand All @@ -288,17 +230,13 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E
return nil
}

if !slices.Contains(a.cols, field) {
return fmt.Errorf("unable to filter for %q, no such table column", field)
}

values[field] = value

switch op {
case equals:
where = append(where, fmt.Sprintf("%s=:%s", field, field))
where = append(where, fmt.Sprintf("entry ->> '%s'=:%s", field, field))
case like:
where = append(where, fmt.Sprintf("%s like '%%' || %s || '%%'", field, field))
where = append(where, fmt.Sprintf("entry ->> '%s' like '%%' || :%s || '%%'", field, field))
default:
return fmt.Errorf("comp op not known")
}
Expand Down Expand Up @@ -347,13 +285,6 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E
return nil, err
}

query := sq.
Select(a.cols...).
From("traces").
Columns(a.cols...).
Where(strings.Join(where, " AND ")).
OrderBy("timestamp ASC")

// to make queries more efficient for timescaledb, we always provide from
if filter.From.IsZero() {
filter.From = time.Now().Add(-24 * time.Hour)
Expand All @@ -366,6 +297,13 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E
values["to"] = filter.To
where = append(where, "timestamp <= :to")
}

query := sq.
Select("timestamp", "entry").
From("traces").
Where(strings.Join(where, " AND ")).
OrderBy("timestamp ASC")

if filter.Limit != 0 {
query.Limit(uint64(filter.Limit))
}
Expand All @@ -384,58 +322,21 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E
var entries []Entry

for rows.Next() {
var e timescaleEntry
var e timescaledbRow

err = rows.StructScan(&e)
if err != nil {
return nil, err
}

entry, err := a.toExternal(e)
var entry Entry
err = json.Unmarshal(e.Entry, &entry)
if err != nil {
return nil, fmt.Errorf("unable to convert entry: %w", err)
return nil, fmt.Errorf("error unmarshaling entry: %w", err)
}

entries = append(entries, entry)
}

return entries, nil
}

func (_ *timescaleAuditing) toInternal(e Entry) (*timescaleEntry, error) {
intermediate, err := json.Marshal(e) // nolint
if err != nil {
return nil, err
}
var internalEntry timescaleEntry
err = json.Unmarshal(intermediate, &internalEntry) // nolint
if err != nil {
return nil, err
}

internalEntry.RequestId = e.RequestId
if e.Error != nil {
internalEntry.Error = e.Error.Error()
}

return &internalEntry, nil
}

func (_ *timescaleAuditing) toExternal(e timescaleEntry) (Entry, error) {
intermediate, err := json.Marshal(e) // nolint
if err != nil {
return Entry{}, err
}
var externalEntry Entry
err = json.Unmarshal(intermediate, &externalEntry) // nolint
if err != nil {
return Entry{}, err
}

externalEntry.Id = e.RequestId
if e.Error != "" {
externalEntry.Error = errors.New(e.Error)
}

return externalEntry, nil
}
4 changes: 3 additions & 1 deletion auditing/timescaledb_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func TestAuditing_TimescaleDB(t *testing.T) {
name: "insert one entry",
t: func(t *testing.T, a Auditing) {
err := a.Index(Entry{
Body: "test",
Timestamp: now,
Body: "test",
})
require.NoError(t, err)
err = a.Flush()
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
entries, err = a.Search(ctx, EntryFilter{
Body: "This",
})

require.NoError(t, err)
assert.Len(t, entries, len(es))
},
Expand Down

0 comments on commit 14976a0

Please sign in to comment.