Skip to content

Commit

Permalink
feat: add Timescale DB with Hypertable and Retention support (#1517)
Browse files Browse the repository at this point in the history
  • Loading branch information
m4rcs authored Jul 17, 2024
1 parent ddebd34 commit 6259e13
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 49 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ Blocky is a DNS proxy and ad-blocker for the local network written in Go with fo

- [Prometheus](https://prometheus.io/) metrics
- Prepared [Grafana](https://grafana.com/) dashboards (Prometheus and database)
- Logging of DNS queries per day / per client in CSV format or MySQL/MariaDB/PostgreSQL database - easy to analyze
- Logging of DNS queries per day / per client in CSV format or MySQL/MariaDB/PostgreSQL/Timescale database - easy to
analyze
- Various REST API endpoints
- CLI tool

Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (v *TLSVersion) validate(logger *logrus.Entry) {
// postgresql // PostgreSQL database
// csv // CSV file per day
// csv-client // CSV file per day and client
// timescale // Timescale database
// )
type QueryLogType int16

Expand Down Expand Up @@ -466,7 +467,6 @@ func loadConfig(logger *logrus.Entry, path string, mandatory bool) (rCfg *Config
prettyPath = filepath.Join(path, "*")

data, err = readFromDir(path, data)

if err != nil {
return nil, fmt.Errorf("can't read config files: %w", err)
}
Expand Down
9 changes: 8 additions & 1 deletion config/config_enum.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion docs/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ info:
- [Prometheus](https://prometheus.io/) metrics
- Prepared [Grafana](https://grafana.com/) dashboards (Prometheus and database)
- Logging of DNS queries per day / per client in CSV format or MySQL/MariaDB/PostgreSQL database - easy to analyze
- Logging of DNS queries per day / per client in CSV format or MySQL/MariaDB/PostgreSQL/Timescale database - easy to
analyze
- Various REST API endpoints
- CLI tool
Expand Down
2 changes: 1 addition & 1 deletion docs/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ prometheus:

# optional: write query information (question, answer, client, duration etc.) to daily csv file
queryLog:
# optional one of: mysql, postgresql, csv, csv-client. If empty, log to console
# optional one of: mysql, postgresql, timescale, csv, csv-client. If empty, log to console
type: mysql
# directory (should be mounted as volume in docker) for csv, db connection string for mysql/postgresql
target: db_user:db_password@tcp(db_host_or_ip:3306)/db_name?charset=utf8mb4&parseTime=True&loc=Local
Expand Down
19 changes: 10 additions & 9 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ You can select one of following query log types:

- `mysql` - log each query in the external MySQL/MariaDB database
- `postgresql` - log each query in the external PostgreSQL database
- `timescale` - log each query in the external Timescale database
- `csv` - log into CSV file (one per day)
- `csv-client` - log into CSV file (one per day and per client)
- `console` - log into console output
Expand All @@ -671,15 +672,15 @@ You can choose which information from processed DNS request and response should

Configuration parameters:

| Parameter | Type | Mandatory | Default value | Description |
| ------------------------- | ------------------------------------------------------------------------------------ | --------- | ------------- | ---------------------------------------------------------------------------------- |
| queryLog.type | enum (mysql, postgresql, csv, csv-client, console, none (see above)) | no | | Type of logging target. Console if empty |
| queryLog.target | string | no | | directory for writing the logs (for csv) or database url (for mysql or postgresql) |
| queryLog.logRetentionDays | int | no | 0 | if > 0, deletes log files/database entries which are older than ... days |
| queryLog.creationAttempts | int | no | 3 | Max attempts to create specific query log writer |
| queryLog.creationCooldown | duration format | no | 2s | Time between the creation attempts |
| queryLog.fields | list enum (clientIP, clientName, responseReason, responseAnswer, question, duration) | no | all | which information should be logged |
| queryLog.flushInterval | duration format | no | 30s | Interval to write data in bulk to the external database |
| Parameter | Type | Mandatory | Default value | Description |
| ------------------------- | ------------------------------------------------------------------------------------ | --------- | ------------- | --------------------------------------------------------------------------------------------- |
| queryLog.type | enum (mysql, postgresql, timescale, csv, csv-client, console, none (see above)) | no | | Type of logging target. Console if empty |
| queryLog.target | string | no | | directory for writing the logs (for csv) or database url (for mysql, postgresql or timescale) |
| queryLog.logRetentionDays | int | no | 0 | if > 0, deletes log files/database entries which are older than ... days |
| queryLog.creationAttempts | int | no | 3 | Max attempts to create specific query log writer |
| queryLog.creationCooldown | duration format | no | 2s | Time between the creation attempts |
| queryLog.fields | list enum (clientIP, clientName, responseReason, responseAnswer, question, duration) | no | all | which information should be logged |
| queryLog.flushInterval | duration format | no | 30s | Interval to write data in bulk to the external database |

!!! hint

Expand Down
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Blocky is a DNS proxy and ad-blocker for the local network written in Go with fo

* [Prometheus](https://prometheus.io/) metrics
* Prepared [Grafana](https://grafana.com/) dashboards (Prometheus and database)
* Logging of DNS queries per day / per client in CSV format or MySQL/MariaDB/PostgreSQL database - easy to analyze
* Logging of DNS queries per day / per client in CSV format or MySQL/MariaDB/PostgreSQL/Timescale database - easy to
analyze
* Various REST API endpoints
* CLI tool

Expand Down
22 changes: 22 additions & 0 deletions e2e/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
const (
redisImage = "redis:7"
postgresImage = "postgres:15.2-alpine"
timescaleImage = "timescale/timescaledb:latest-pg15"
mariaDBImage = "mariadb:11"
mokaImage = "ghcr.io/0xerr0r/dns-mokka:0.2.0"
staticServerImage = "halverneus/static-file-server:latest"
Expand Down Expand Up @@ -122,6 +123,27 @@ func createPostgresContainer(ctx context.Context, e2eNet *testcontainers.DockerN
))
}

// createTimescaleContainer creates a postgres container with timescale extension attached to the test network under the
// alias 'timescale'. It creates a database 'user' with user 'user' and password 'user'.
// It is automatically terminated when the test is finished.
func createTimescaleContainer(ctx context.Context, e2eNet *testcontainers.DockerNetwork,
) (*postgres.PostgresContainer, error) {
const waitLogOccurrence = 2

return deferTerminate(postgres.RunContainer(ctx,
testcontainers.WithImage(timescaleImage),

postgres.WithDatabase("user"),
postgres.WithUsername("user"),
postgres.WithPassword("user"),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(waitLogOccurrence).
WithStartupTimeout(startupTimeout)),
withNetwork("timescale", e2eNet),
))
}

// createMariaDBContainer creates a mariadb container attached to the test network under the alias 'mariaDB'.
// It creates a database 'user' with user 'user' and password 'user'.
// It is automatically terminated when the test is finished.
Expand Down
73 changes: 73 additions & 0 deletions e2e/querylog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,79 @@ var _ = Describe("Query logs functional tests", func() {
})
})
})

Describe("Query logging into the timescale database", func() {
BeforeEach(func(ctx context.Context) {
postgresDB, err = createTimescaleContainer(ctx, e2eNet)
Expect(err).Should(Succeed())

blocky, err = createBlockyContainer(ctx, e2eNet,
"log:",
" level: warn",
"upstreams:",
" groups:",
" default:",
" - moka1",
"queryLog:",
" type: timescale",
" target: postgres://user:user@timescale:5432/user",
" flushInterval: 1s",
)
Expect(err).Should(Succeed())

connectionString, err := postgresDB.ConnectionString(ctx, "sslmode=disable")
Expect(err).Should(Succeed())

// database might be slow on first start, retry here if necessary
Eventually(gorm.Open, "10s", "1s").
WithArguments(postgresDriver.Open(connectionString), &gorm.Config{}).ShouldNot(BeNil())

db, err = gorm.Open(postgresDriver.Open(connectionString), &gorm.Config{})
Expect(err).Should(Succeed())

Eventually(countEntries).WithArguments(db).Should(BeNumerically("==", 0))
})
When("Some queries were performed", func() {
msg := util.NewMsgWithQuestion("google.de.", dns.Type(dns.TypeA))
It("Should store query log in the timescale database", func(ctx context.Context) {
By("Performing 2 queries", func() {
Expect(doDNSRequest(ctx, blocky, msg)).ShouldNot(BeNil())
Expect(doDNSRequest(ctx, blocky, msg)).ShouldNot(BeNil())
})

By("check entries count asynchronously, since blocky flushes log entries in bulk", func() {
Eventually(countEntries, "60s", "1s").WithArguments(db).Should(BeNumerically("==", 2))
})

By("check entry content", func() {
entries, err := queryEntries(db)
Expect(err).Should(Succeed())

Expect(entries).Should(HaveLen(2))

Expect(entries[0]).
Should(
SatisfyAll(
HaveField("ResponseType", "RESOLVED"),
HaveField("QuestionType", "A"),
HaveField("QuestionName", "google.de"),
HaveField("Answer", "A (1.2.3.4)"),
HaveField("ResponseCode", "NOERROR"),
))

Expect(entries[1]).
Should(
SatisfyAll(
HaveField("ResponseType", "CACHED"),
HaveField("QuestionType", "A"),
HaveField("QuestionName", "google.de"),
HaveField("Answer", "A (1.2.3.4)"),
HaveField("ResponseCode", "NOERROR"),
))
})
})
})
})
})

type logEntry struct {
Expand Down
40 changes: 32 additions & 8 deletions querylog/database_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -49,16 +50,16 @@ func NewDatabaseWriter(ctx context.Context, dbType, target string, logRetentionD
) (*DatabaseWriter, error) {
switch dbType {
case "mysql":
return newDatabaseWriter(ctx, mysql.Open(target), logRetentionDays, dbFlushPeriod)
case "postgresql":
return newDatabaseWriter(ctx, postgres.Open(target), logRetentionDays, dbFlushPeriod)
return newDatabaseWriter(ctx, mysql.Open(target), logRetentionDays, dbFlushPeriod, dbType)
case "postgresql", "timescale":
return newDatabaseWriter(ctx, postgres.Open(target), logRetentionDays, dbFlushPeriod, dbType)
}

return nil, fmt.Errorf("incorrect database type provided: %s", dbType)
}

func newDatabaseWriter(ctx context.Context, target gorm.Dialector, logRetentionDays uint64,
dbFlushPeriod time.Duration,
dbFlushPeriod time.Duration, dbType string,
) (*DatabaseWriter, error) {
db, err := gorm.Open(target, &gorm.Config{
Logger: logger.New(
Expand All @@ -75,7 +76,7 @@ func newDatabaseWriter(ctx context.Context, target gorm.Dialector, logRetentionD
}

// Migrate the schema
if err := databaseMigration(db); err != nil {
if err := databaseMigration(db, dbType, logRetentionDays); err != nil {
return nil, fmt.Errorf("can't perform auto migration: %w", err)
}

Expand All @@ -90,15 +91,15 @@ func newDatabaseWriter(ctx context.Context, target gorm.Dialector, logRetentionD
return w, nil
}

func databaseMigration(db *gorm.DB) error {
func databaseMigration(db *gorm.DB, dbType string, logRetentionDays uint64) error {
if err := db.AutoMigrate(&logEntry{}); err != nil {
return err
}

tableName := db.NamingStrategy.TableName(reflect.TypeOf(logEntry{}).Name())

// create unmapped primary key
switch db.Config.Name() {
switch dbType {
case "mysql":
tx := db.Exec("ALTER TABLE `" + tableName + "` ADD `id` INT PRIMARY KEY AUTO_INCREMENT")
if tx.Error != nil {
Expand All @@ -113,7 +114,30 @@ func databaseMigration(db *gorm.DB) error {
}

case "postgres":
return db.Exec("ALTER TABLE " + tableName + " ADD column if not exists id serial primary key").Error
return db.Exec("ALTER TABLE " + tableName + " ADD column if not exists id bigserial primary key").Error

case "timescale":
requestTSColName := db.NamingStrategy.ColumnName(reflect.TypeOf(logEntry{}).Name(), "RequestTS")

// Create a Timescale hypertable
tx := db.Exec(`SELECT create_hypertable(
'` + tableName + `',
by_range('` + requestTSColName + `'),
if_not_exists => TRUE
)`)
if tx.Error != nil {
return tx.Error
}

// Create a retention policy for the hypertable
tx = db.Exec(`SELECT add_retention_policy(
'` + tableName + `',
drop_after => INTERVAL '` + strconv.FormatUint(logRetentionDays, 10) + ` days',
if_not_exists => TRUE
)`)
if tx.Error != nil {
return tx.Error
}
}

return nil
Expand Down
Loading

0 comments on commit 6259e13

Please sign in to comment.