diff --git a/README.md b/README.md index 5fb3028..58a4fcb 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,11 @@ +![build status](https://github.com/acrlabs/prom2parquet/actions/workflows/verify.yml/badge.svg) + # prom2parquet Remote write target for Prometheus that saves metrics to parquet files -**This should be considered an alpha project** +**⚠️ This should be considered an alpha project. ⚠️** +In particular, the schema for the saved Parquet files is likely to change in the future. ## Overview @@ -25,11 +28,12 @@ Usage: prom2parquet [flags] Flags: - --clean-local-storage delete pod-local parquet files upon flush + --backend backend supported remote backends for saving parquet files + (valid options: none, s3/aws) (default local) + --backend-root string root path/location for the specified backend (e.g. bucket name for AWS S3) + (default "/data") -h, --help help for prom2parquet --prefix string directory prefix for saving parquet files - --remote remote supported remote endpoints for saving parquet files - (valid options: none, s3/aws) (default none) -p, --server-port int port for the remote write endpoint to listen on (default 1234) -v, --verbosity verbosity log level (valid options: debug, error, fatal, info, panic, trace, warning/warn) (default info) @@ -37,18 +41,18 @@ Flags: Here is a brief overview of the options: -### clean-local-storage +### backend -To reduce pod-local storage, you can configure prom2parquet to remove all parquet files after they've been written -(currently once per hour). This is generally not very useful unless you've also configured a remote storage option. +Where to store the Parquet files;; currently supports pod-local storage and AWS S3. -### prefix +### backend-root -This option provides a prefix that can be used to differentiate between metrics collections. +"Root" location for the backend storage. For pod-local storage this is the base directory, for AWS S3 this is the +bucket name. -### remote +### prefix -Whether to save the parquet files to some remote storage; currently the only supported remote storage option is AWS S3. +This option provides a prefix that can be used to differentiate between metrics collections. ### server-port @@ -89,6 +93,17 @@ the executable, create and push the Docker images, and deploy to the configured All build artifacts are placed in the `.build/` subdirectory. You can remove this directory or run `make clean` to clean up. +### Testing + +Run `make test` to run all the unit/integration tests. If you want to test using pod-local storage, and you want to +flush the Parquet files to disk without terminating the pod (e.g., so you can copy them elsewhere), you can send the +process a SIGUSR1: + +``` +> kubectl exec prom2parquet-pod -- kill -s SIGUSR1 +> kubectl cp prom2parquet-pod:/path/to/files ./ +``` + ### Code of Conduct Applied Computing Research Labs has a strict code of conduct we expect all contributors to adhere to. Please read the diff --git a/cmd/args.go b/cmd/args.go index 5058a0d..422b446 100644 --- a/cmd/args.go +++ b/cmd/args.go @@ -19,7 +19,7 @@ const ( //nolint:gochecknoglobals var supportedBackendIDs = map[backends.StorageBackend][]string{ - backends.Local: {"none"}, + backends.Local: {"local"}, backends.S3: {"s3", "aws"}, } diff --git a/cmd/server.go b/cmd/server.go index 292c26c..b49daf8 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -26,7 +26,9 @@ type promserver struct { opts *options channels map[string]chan prompb.TimeSeries - m sync.RWMutex + m sync.RWMutex + flushChannel chan os.Signal + killChannel chan os.Signal } func newServer(opts *options) *promserver { @@ -37,6 +39,9 @@ func newServer(opts *options) *promserver { httpserv: &http.Server{Addr: fulladdr, Handler: mux, ReadHeaderTimeout: 10 * time.Second}, opts: opts, channels: map[string]chan prompb.TimeSeries{}, + + flushChannel: make(chan os.Signal, 1), + killChannel: make(chan os.Signal, 1), } mux.HandleFunc("/receive", s.metricsReceive) @@ -44,11 +49,8 @@ func newServer(opts *options) *promserver { } func (self *promserver) run() { - flushChannel := make(chan os.Signal, 1) - signal.Notify(flushChannel, syscall.SIGUSR1) - - killChannel := make(chan os.Signal, 1) - signal.Notify(killChannel, syscall.SIGTERM) + signal.Notify(self.flushChannel, syscall.SIGUSR1) + signal.Notify(self.killChannel, syscall.SIGTERM) endChannel := make(chan struct{}, 1) @@ -59,15 +61,15 @@ func (self *promserver) run() { }() go func() { - <-killChannel + <-self.killChannel self.handleShutdown() close(endChannel) }() go func() { - <-flushChannel - log.Infof("SIGUSR1 received") - self.stopServer(true) + <-self.flushChannel + log.Infof("SIGUSR1 received; sleeping indefinitely") + self.stopServer() }() log.Infof("server listening on %s", self.httpserv.Addr) @@ -82,7 +84,7 @@ func (self *promserver) handleShutdown() { } }() - self.stopServer(false) + self.stopServer() timer := time.AfterFunc(shutdownTime, func() { os.Exit(0) }) @@ -90,7 +92,7 @@ func (self *promserver) handleShutdown() { <-timer.C } -func (self *promserver) stopServer(stayAlive bool) { +func (self *promserver) stopServer() { log.Infof("flushing all data files") for _, ch := range self.channels { close(ch) @@ -101,11 +103,6 @@ func (self *promserver) stopServer(stayAlive bool) { if err := self.httpserv.Shutdown(ctxTimeout); err != nil { log.Errorf("failed shutting server down: %v", err) } - - if stayAlive { - log.Infof("sleeping indefinitely") - select {} - } } func (self *promserver) metricsReceive(w http.ResponseWriter, req *http.Request) { @@ -159,7 +156,7 @@ func (self *promserver) spawnWriter(ctx context.Context, metricName string) (cha ch := make(chan prompb.TimeSeries) self.channels[metricName] = ch - go writer.Listen(ch) + go writer.Listen(ch) //nolint:contextcheck // the req context and the backend creation context should be separate return ch, nil } diff --git a/cmd/server_test.go b/cmd/server_test.go new file mode 100644 index 0000000..2fd09d5 --- /dev/null +++ b/cmd/server_test.go @@ -0,0 +1,89 @@ +package main + +import ( + "strings" + "testing" + "time" + + "github.com/prometheus/prometheus/prompb" + "github.com/samber/lo" + log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" +) + +// These tests are actually super-hacky and brittle, but I'm not too sure how to do these tests a different way. It +// would be nice to not just be comparing log outputs to see which code paths were taken, for one thing... +// +// Also, these tests are sortof inherently race-y, as evidenced by this delightful constant: +const sleepTime = 100 * time.Millisecond + +func TestServerRun(t *testing.T) { + cases := map[string]struct { + operations func(*promserver) + expectedLogEntries []string + forbiddenLogEntries []string + }{ + "kill": { + operations: func(s *promserver) { close(s.killChannel) }, + expectedLogEntries: []string{ + "server failed", + "flushing all data files", + "shutting down", + }, + forbiddenLogEntries: []string{ + "SIGUSR1 received", + "recovered from panic", + }, + }, + "flush": { + operations: func(s *promserver) { close(s.flushChannel) }, + expectedLogEntries: []string{ + "server failed", + "flushing all data files", + "SIGUSR1 received", + }, + forbiddenLogEntries: []string{ + "shutting down", + "recovered from panic", + }, + }, + "flush then kill": { + operations: func(s *promserver) { close(s.flushChannel); time.Sleep(sleepTime); close(s.killChannel) }, + expectedLogEntries: []string{ + "server failed", + "flushing all data files", + "SIGUSR1 received", + "shutting down", + "recovered from panic", + }, + forbiddenLogEntries: []string{}, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + logs := test.NewGlobal() + srv := newServer(&options{}) + srv.channels["foo"] = make(chan prompb.TimeSeries) + go srv.run() + + tc.operations(srv) + time.Sleep(sleepTime) + + entries := logs.AllEntries() + + for _, expected := range tc.expectedLogEntries { + assert.GreaterOrEqual(t, len(lo.Filter(entries, func(e *log.Entry, _ int) bool { + return strings.Contains(e.Message, expected) + })), 1) + } + + for _, forbidden := range tc.forbiddenLogEntries { + assert.Len(t, lo.Filter(entries, func(e *log.Entry, _ int) bool { + return strings.Contains(e.Message, forbidden) + }), 0) + } + }) + } +} diff --git a/k8s/main.py b/k8s/main.py index 100647f..6f82815 100755 --- a/k8s/main.py +++ b/k8s/main.py @@ -29,8 +29,6 @@ def __init__(self): args=[ "/prom2parquet", "--prefix", "testing", - "--backend", "s3", - "--backend-root", "simkube", ], ).with_env(env).with_ports(SERVER_PORT).with_security_context(Capability.DEBUG) diff --git a/pkg/backends/backend.go b/pkg/backends/backend.go index a939907..e8d6e45 100644 --- a/pkg/backends/backend.go +++ b/pkg/backends/backend.go @@ -49,8 +49,12 @@ func ConstructBackendForFile( //nolint:ireturn // this is fine case Memory: log.Warnf("using in-memory backend, this is intended for testing only!") + fullPath, err := filepath.Abs(fmt.Sprintf("%s/%s", root, file)) + if err != nil { + return nil, fmt.Errorf("can't construct local path %s/%s: %w", root, file, err) + } - fw, err := mem.NewMemFileWriter(file, nil) + fw, err := mem.NewMemFileWriter(fullPath, nil) if err != nil { return nil, fmt.Errorf("can't create in-memory writer: %w", err) } diff --git a/pkg/parquet/labels.go b/pkg/parquet/labels.go index 1e3d554..feaffc3 100644 --- a/pkg/parquet/labels.go +++ b/pkg/parquet/labels.go @@ -2,6 +2,7 @@ package parquet import ( "fmt" + "sort" "strings" "github.com/prometheus/common/model" @@ -15,7 +16,7 @@ const ( nodeKey = "node" ) -func createDatapointForLabels(labels []prompb.Label) DataPoint { +func createDataPointForLabels(labels []prompb.Label) DataPoint { dp := DataPoint{} label_strs := []string{} @@ -30,11 +31,12 @@ func createDatapointForLabels(labels []prompb.Label) DataPoint { case containerKey: dp.Container = l.Value case nodeKey: - dp.Container = l.Value + dp.Node = l.Value default: label_strs = append(label_strs, fmt.Sprintf("%s=%s", l.Name, l.Value)) } } + sort.Strings(label_strs) dp.Labels = strings.Join(label_strs, ",") return dp diff --git a/pkg/parquet/labels_test.go b/pkg/parquet/labels_test.go new file mode 100644 index 0000000..9a9418d --- /dev/null +++ b/pkg/parquet/labels_test.go @@ -0,0 +1,56 @@ +package parquet + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" +) + +const ( + podLabel = "testing-12345-asdf" + namespaceLabel = "the-namespace" + nodeLabel = "the-node" + containerLabel = "the-container" +) + +func TestCreateDataPointForLabels(t *testing.T) { + labels := []prompb.Label{ + { + Name: model.MetricNameLabel, + Value: "kube_node_stuff", + }, + { + Name: podNameKey, + Value: podLabel, + }, + { + Name: "other-label", + Value: "foo-bar", + }, + { + Name: namespaceKey, + Value: namespaceLabel, + }, + { + Name: nodeKey, + Value: nodeLabel, + }, + { + Name: containerKey, + Value: containerLabel, + }, + { + Name: "a-label", + Value: "baz-buz", + }, + } + + dp := createDataPointForLabels(labels) + assert.Equal(t, podLabel, dp.Pod) + assert.Equal(t, containerLabel, dp.Container) + assert.Equal(t, namespaceLabel, dp.Namespace) + assert.Equal(t, nodeLabel, dp.Node) + assert.Equal(t, "a-label=baz-buz,other-label=foo-bar", dp.Labels) +} diff --git a/pkg/parquet/writer.go b/pkg/parquet/writer.go index 5f0f89e..ca129c1 100644 --- a/pkg/parquet/writer.go +++ b/pkg/parquet/writer.go @@ -14,6 +14,8 @@ import ( "github.com/acrlabs/prom2parquet/pkg/backends" ) +const pageNum = 4 + type Prom2ParquetWriter struct { backend backends.StorageBackend root string @@ -70,7 +72,7 @@ func (self *Prom2ParquetWriter) Listen(stream <-chan prompb.TimeSeries) { return } - dp := createDatapointForLabels(ts.Labels) + dp := createDataPointForLabels(ts.Labels) for _, s := range ts.Samples { dp.Value = s.Value dp.Timestamp = s.Timestamp @@ -111,9 +113,9 @@ func (self *Prom2ParquetWriter) flush() error { return fmt.Errorf("can't create storage backend writer: %w", err) } - pw, err := writer.NewParquetWriter(fw, new(DataPoint), 4) + pw, err := writer.NewParquetWriter(fw, new(DataPoint), pageNum) if err != nil { - return fmt.Errorf("can't create parquet writerrr: %w", err) + return fmt.Errorf("can't create parquet writer: %w", err) } pw.CompressionType = parquet.CompressionCodec_SNAPPY diff --git a/pkg/parquet/writer_test.go b/pkg/parquet/writer_test.go new file mode 100644 index 0000000..8e9aace --- /dev/null +++ b/pkg/parquet/writer_test.go @@ -0,0 +1,60 @@ +package parquet + +import ( + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/xitongsys/parquet-go-source/mem" + + "github.com/acrlabs/prom2parquet/pkg/backends" +) + +func TestFlush(t *testing.T) { + fs := afero.NewMemMapFs() + mem.SetInMemFileFs(&fs) + + w := Prom2ParquetWriter{ + backend: backends.Memory, + root: "/test", + metric: "kube_node_stuff", + + clock: clockwork.NewFakeClockAt(time.Time{}), + } + + err := w.flush() + assert.Nil(t, err) + + exists, err := afero.Exists(fs, "/test/kube_node_stuff/0001010100.parquet") + if err != nil { + panic(err) + } + assert.True(t, exists) +} + +func TestAdvanceFlushTime(t *testing.T) { + w := Prom2ParquetWriter{} + + cases := map[string]struct { + now time.Time + expected time.Time + }{ + "same day": { + now: time.Date(2024, 02, 20, 21, 34, 45, 0, time.UTC), + expected: time.Date(2024, 02, 20, 22, 0, 0, 0, time.UTC), + }, + "next day": { + now: time.Date(2024, 02, 20, 23, 34, 45, 0, time.UTC), + expected: time.Date(2024, 02, 21, 0, 0, 0, 0, time.UTC), + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + w.advanceFlushTime(&tc.now) + assert.Equal(t, tc.expected, w.nextFlushTime) + }) + } +}