Skip to content

Commit

Permalink
adding tests, some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Feb 22, 2024
1 parent 01b4b70 commit 774e71f
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 38 deletions.
37 changes: 26 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
![build status](https://github.com/acrlabs/prom2parquet/actions/workflows/verify.yml/badge.svg)

# prom2parquet

Remote write target for Prometheus that saves metrics to parquet files

**This should be considered an alpha project**
**⚠️ This should be considered an alpha project. ⚠️**
In particular, the schema for the saved Parquet files is likely to change in the future.

## Overview

Expand All @@ -25,30 +28,31 @@ Usage:
prom2parquet [flags]
Flags:
--clean-local-storage delete pod-local parquet files upon flush
--backend backend supported remote backends for saving parquet files
(valid options: none, s3/aws) (default local)
--backend-root string root path/location for the specified backend (e.g. bucket name for AWS S3)
(default "/data")
-h, --help help for prom2parquet
--prefix string directory prefix for saving parquet files
--remote remote supported remote endpoints for saving parquet files
(valid options: none, s3/aws) (default none)
-p, --server-port int port for the remote write endpoint to listen on (default 1234)
-v, --verbosity verbosity log level (valid options: debug, error, fatal, info, panic, trace, warning/warn)
(default info)
```

Here is a brief overview of the options:

### clean-local-storage
### backend

To reduce pod-local storage, you can configure prom2parquet to remove all parquet files after they've been written
(currently once per hour). This is generally not very useful unless you've also configured a remote storage option.
Where to store the Parquet files;; currently supports pod-local storage and AWS S3.

### prefix
### backend-root

This option provides a prefix that can be used to differentiate between metrics collections.
"Root" location for the backend storage. For pod-local storage this is the base directory, for AWS S3 this is the
bucket name.

### remote
### prefix

Whether to save the parquet files to some remote storage; currently the only supported remote storage option is AWS S3.
This option provides a prefix that can be used to differentiate between metrics collections.

### server-port

Expand Down Expand Up @@ -89,6 +93,17 @@ the executable, create and push the Docker images, and deploy to the configured
All build artifacts are placed in the `.build/` subdirectory. You can remove this directory or run `make clean` to
clean up.

### Testing

Run `make test` to run all the unit/integration tests. If you want to test using pod-local storage, and you want to
flush the Parquet files to disk without terminating the pod (e.g., so you can copy them elsewhere), you can send the
process a SIGUSR1:

```
> kubectl exec prom2parquet-pod -- kill -s SIGUSR1 <pid>
> kubectl cp prom2parquet-pod:/path/to/files ./
```

### Code of Conduct

Applied Computing Research Labs has a strict code of conduct we expect all contributors to adhere to. Please read the
Expand Down
2 changes: 1 addition & 1 deletion cmd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (

//nolint:gochecknoglobals
var supportedBackendIDs = map[backends.StorageBackend][]string{
backends.Local: {"none"},
backends.Local: {"local"},
backends.S3: {"s3", "aws"},
}

Expand Down
33 changes: 15 additions & 18 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type promserver struct {
opts *options
channels map[string]chan prompb.TimeSeries

m sync.RWMutex
m sync.RWMutex
flushChannel chan os.Signal
killChannel chan os.Signal
}

func newServer(opts *options) *promserver {
Expand All @@ -37,18 +39,18 @@ func newServer(opts *options) *promserver {
httpserv: &http.Server{Addr: fulladdr, Handler: mux, ReadHeaderTimeout: 10 * time.Second},
opts: opts,
channels: map[string]chan prompb.TimeSeries{},

flushChannel: make(chan os.Signal, 1),
killChannel: make(chan os.Signal, 1),
}
mux.HandleFunc("/receive", s.metricsReceive)

return s
}

func (self *promserver) run() {
flushChannel := make(chan os.Signal, 1)
signal.Notify(flushChannel, syscall.SIGUSR1)

killChannel := make(chan os.Signal, 1)
signal.Notify(killChannel, syscall.SIGTERM)
signal.Notify(self.flushChannel, syscall.SIGUSR1)
signal.Notify(self.killChannel, syscall.SIGTERM)

endChannel := make(chan struct{}, 1)

Expand All @@ -59,15 +61,15 @@ func (self *promserver) run() {
}()

go func() {
<-killChannel
<-self.killChannel
self.handleShutdown()
close(endChannel)
}()

go func() {
<-flushChannel
log.Infof("SIGUSR1 received")
self.stopServer(true)
<-self.flushChannel
log.Infof("SIGUSR1 received; sleeping indefinitely")
self.stopServer()
}()

log.Infof("server listening on %s", self.httpserv.Addr)
Expand All @@ -82,15 +84,15 @@ func (self *promserver) handleShutdown() {
}
}()

self.stopServer(false)
self.stopServer()
timer := time.AfterFunc(shutdownTime, func() {
os.Exit(0)
})

<-timer.C
}

func (self *promserver) stopServer(stayAlive bool) {
func (self *promserver) stopServer() {
log.Infof("flushing all data files")
for _, ch := range self.channels {
close(ch)
Expand All @@ -101,11 +103,6 @@ func (self *promserver) stopServer(stayAlive bool) {
if err := self.httpserv.Shutdown(ctxTimeout); err != nil {
log.Errorf("failed shutting server down: %v", err)
}

if stayAlive {
log.Infof("sleeping indefinitely")
select {}
}
}

func (self *promserver) metricsReceive(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -159,7 +156,7 @@ func (self *promserver) spawnWriter(ctx context.Context, metricName string) (cha
ch := make(chan prompb.TimeSeries)
self.channels[metricName] = ch

go writer.Listen(ch)
go writer.Listen(ch) //nolint:contextcheck // the req context and the backend creation context should be separate

Check warning on line 159 in cmd/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/server.go#L159

Added line #L159 was not covered by tests

return ch, nil
}
89 changes: 89 additions & 0 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"strings"
"testing"
"time"

"github.com/prometheus/prometheus/prompb"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

// These tests are actually super-hacky and brittle, but I'm not too sure how to do these tests a different way. It
// would be nice to not just be comparing log outputs to see which code paths were taken, for one thing...
//
// Also, these tests are sortof inherently race-y, as evidenced by this delightful constant:
const sleepTime = 100 * time.Millisecond

func TestServerRun(t *testing.T) {
cases := map[string]struct {
operations func(*promserver)
expectedLogEntries []string
forbiddenLogEntries []string
}{
"kill": {
operations: func(s *promserver) { close(s.killChannel) },
expectedLogEntries: []string{
"server failed",
"flushing all data files",
"shutting down",
},
forbiddenLogEntries: []string{
"SIGUSR1 received",
"recovered from panic",
},
},
"flush": {
operations: func(s *promserver) { close(s.flushChannel) },
expectedLogEntries: []string{
"server failed",
"flushing all data files",
"SIGUSR1 received",
},
forbiddenLogEntries: []string{
"shutting down",
"recovered from panic",
},
},
"flush then kill": {
operations: func(s *promserver) { close(s.flushChannel); time.Sleep(sleepTime); close(s.killChannel) },
expectedLogEntries: []string{
"server failed",
"flushing all data files",
"SIGUSR1 received",
"shutting down",
"recovered from panic",
},
forbiddenLogEntries: []string{},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
logs := test.NewGlobal()
srv := newServer(&options{})
srv.channels["foo"] = make(chan prompb.TimeSeries)
go srv.run()

tc.operations(srv)
time.Sleep(sleepTime)

entries := logs.AllEntries()

for _, expected := range tc.expectedLogEntries {
assert.GreaterOrEqual(t, len(lo.Filter(entries, func(e *log.Entry, _ int) bool {
return strings.Contains(e.Message, expected)
})), 1)
}

for _, forbidden := range tc.forbiddenLogEntries {
assert.Len(t, lo.Filter(entries, func(e *log.Entry, _ int) bool {
return strings.Contains(e.Message, forbidden)
}), 0)
}
})
}
}
2 changes: 0 additions & 2 deletions k8s/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ def __init__(self):
args=[
"/prom2parquet",
"--prefix", "testing",
"--backend", "s3",
"--backend-root", "simkube",
],
).with_env(env).with_ports(SERVER_PORT).with_security_context(Capability.DEBUG)

Expand Down
6 changes: 5 additions & 1 deletion pkg/backends/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ func ConstructBackendForFile( //nolint:ireturn // this is fine

case Memory:
log.Warnf("using in-memory backend, this is intended for testing only!")
fullPath, err := filepath.Abs(fmt.Sprintf("%s/%s", root, file))
if err != nil {
return nil, fmt.Errorf("can't construct local path %s/%s: %w", root, file, err)
}

Check warning on line 55 in pkg/backends/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/backends/backend.go#L54-L55

Added lines #L54 - L55 were not covered by tests

fw, err := mem.NewMemFileWriter(file, nil)
fw, err := mem.NewMemFileWriter(fullPath, nil)
if err != nil {
return nil, fmt.Errorf("can't create in-memory writer: %w", err)
}

Check warning on line 60 in pkg/backends/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/backends/backend.go#L59-L60

Added lines #L59 - L60 were not covered by tests
Expand Down
6 changes: 4 additions & 2 deletions pkg/parquet/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package parquet

import (
"fmt"
"sort"
"strings"

"github.com/prometheus/common/model"
Expand All @@ -15,7 +16,7 @@ const (
nodeKey = "node"
)

func createDatapointForLabels(labels []prompb.Label) DataPoint {
func createDataPointForLabels(labels []prompb.Label) DataPoint {
dp := DataPoint{}

label_strs := []string{}
Expand All @@ -30,11 +31,12 @@ func createDatapointForLabels(labels []prompb.Label) DataPoint {
case containerKey:
dp.Container = l.Value
case nodeKey:
dp.Container = l.Value
dp.Node = l.Value
default:
label_strs = append(label_strs, fmt.Sprintf("%s=%s", l.Name, l.Value))
}
}
sort.Strings(label_strs)
dp.Labels = strings.Join(label_strs, ",")

return dp
Expand Down
56 changes: 56 additions & 0 deletions pkg/parquet/labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package parquet

import (
"testing"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
)

const (
podLabel = "testing-12345-asdf"
namespaceLabel = "the-namespace"
nodeLabel = "the-node"
containerLabel = "the-container"
)

func TestCreateDataPointForLabels(t *testing.T) {
labels := []prompb.Label{
{
Name: model.MetricNameLabel,
Value: "kube_node_stuff",
},
{
Name: podNameKey,
Value: podLabel,
},
{
Name: "other-label",
Value: "foo-bar",
},
{
Name: namespaceKey,
Value: namespaceLabel,
},
{
Name: nodeKey,
Value: nodeLabel,
},
{
Name: containerKey,
Value: containerLabel,
},
{
Name: "a-label",
Value: "baz-buz",
},
}

dp := createDataPointForLabels(labels)
assert.Equal(t, podLabel, dp.Pod)
assert.Equal(t, containerLabel, dp.Container)
assert.Equal(t, namespaceLabel, dp.Namespace)
assert.Equal(t, nodeLabel, dp.Node)
assert.Equal(t, "a-label=baz-buz,other-label=foo-bar", dp.Labels)
}
Loading

0 comments on commit 774e71f

Please sign in to comment.