Skip to content

Commit

Permalink
refactor: unify clickhouse database and table names #290
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed Oct 31, 2023
1 parent 62a9dac commit f1de7e8
Show file tree
Hide file tree
Showing 96 changed files with 395 additions and 430 deletions.
8 changes: 4 additions & 4 deletions otel-collector/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ import (
"github.com/DataObserve/datav/otel-collector/exporter/clickhousemetricsexporter"
"github.com/DataObserve/datav/otel-collector/exporter/clickhousetracesexporter"
_ "github.com/DataObserve/datav/otel-collector/pkg/parser/grok"
"github.com/DataObserve/datav/otel-collector/processor/signozspanmetricsprocessor"
"github.com/DataObserve/datav/otel-collector/processor/signoztailsampler"
"github.com/DataObserve/datav/otel-collector/processor/datavspanmetricsprocessor"
"github.com/DataObserve/datav/otel-collector/processor/datavtailsampler"
"github.com/DataObserve/datav/otel-collector/receiver/httpreceiver"
)

Expand Down Expand Up @@ -239,13 +239,13 @@ func Components() (otelcol.Factories, error) {
routingprocessor.NewFactory(),
schemaprocessor.NewFactory(),
servicegraphprocessor.NewFactory(),
signozspanmetricsprocessor.NewFactory(),
datavspanmetricsprocessor.NewFactory(),
spanmetricsprocessor.NewFactory(),
spanprocessor.NewFactory(),
tailsamplingprocessor.NewFactory(),
transformprocessor.NewFactory(),
logstransformprocessor.NewFactory(),
signoztailsampler.NewFactory(),
datavtailsampler.NewFactory(),
}
for _, pr := range factories.Processors {
processors = append(processors, pr)
Expand Down
16 changes: 8 additions & 8 deletions otel-collector/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ receivers:
from: attributes.container_name
to: resource.container_name
# please remove names from below if you want to collect logs from them
- type: filter
id: signoz_logs_filter
expr: 'resource.container_name matches "^signoz-(logspout|frontend|alertmanager|query-service|otel-collector|otel-collector-metrics|clickhouse|zookeeper)"'
# - type: filter
# id: datav_logs_filter
# expr: 'resource.container_name matches "^datav-(logspout|frontend|alertmanager|query-service|otel-collector|otel-collector-metrics|clickhouse|zookeeper)"'
opencensus:
endpoint: 0.0.0.0:55678
otlp/spanmetrics:
Expand Down Expand Up @@ -143,7 +143,7 @@ processors:
send_batch_size: 10000
send_batch_max_size: 11000
timeout: 10s
signozspanmetrics/prometheus:
datavspanmetrics/prometheus:
metrics_exporter: prometheus
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
dimensions_cache_size: 100000
Expand All @@ -155,7 +155,7 @@ processors:
# This is added to ensure the uniqueness of the timeseries
# Otherwise, identical timeseries produced by multiple replicas of
# collectors result in incorrect APM metrics
- name: 'signoz.collector.id'
- name: 'datav.collector.id'
# memory_limiter:
# # 80% of maximum memory up to 2G
# limit_mib: 1500
Expand Down Expand Up @@ -186,7 +186,7 @@ extensions:

exporters:
clickhousetraces:
datasource: tcp://localhost:9000/?database=signoz_traces
dsn: tcp://localhost:9000/?database=datav_traces
docker_multi_node_cluster: ${DOCKER_MULTI_NODE_CLUSTER}
low_cardinal_exception_grouping: ${LOW_CARDINAL_EXCEPTION_GROUPING}
clickhousemetricswrite:
Expand All @@ -210,7 +210,7 @@ exporters:
# logging: {}

clickhouselogsexporter:
dsn: tcp://localhost:9000/
dsn: tcp://localhost:9000/?database=datav_logs
docker_multi_node_cluster: ${DOCKER_MULTI_NODE_CLUSTER}
timeout: 5s
sending_queue:
Expand All @@ -227,7 +227,7 @@ service:
pipelines:
traces:
receivers: [jaeger, otlp]
processors: [signozspanmetrics/prometheus, batch]
processors: [datavspanmetrics/prometheus, batch]
exporters: [clickhousetraces]
metrics:
receivers: [otlp]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@
# collect docker container logs and syslog using filelog reciever

Steps
* Make sure you have clickhouse from signoz running on your system.
* run `make build-signoz-collector` in the root directory of this project
* run the `docker compose up -d`
* generate logs `docker run --rm mingrammer/flog:0.4.3 --format=json --sleep=0.5s --loop`
18 changes: 6 additions & 12 deletions otel-collector/exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ import (
"go.uber.org/zap"
)

const (
CLUSTER = "cluster"
DISTRIBUTED_LOGS_TABLE = "distributed_logs"
DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes"
)

type clickhouseLogsExporter struct {
db clickhouse.Conn
insertLogsSQL string
Expand Down Expand Up @@ -77,7 +71,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
usage.Options{
ReportingInterval: usage.DefaultCollectionInterval,
},
"signoz_logs",
"datav_logs",
UsageExporter,
)
if err != nil {
Expand Down Expand Up @@ -133,7 +127,7 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs)
return fmt.Errorf("PrepareBatch:%w", err)
}

tagStatement, err := e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES))
tagStatement, err := e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", DefaultLogDatabase, DefaultLogTagAttributes))
if err != nil {
return fmt.Errorf("PrepareTagBatch:%w", err)
}
Expand Down Expand Up @@ -218,7 +212,7 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs)
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeLogs)),
tag.Upsert(tableKey, DISTRIBUTED_LOGS_TABLE),
tag.Upsert(tableKey, DefaultLogsTable),
},
writeLatencyMillis.M(int64(time.Since(dbWriteStart).Milliseconds())),
)
Expand All @@ -230,7 +224,7 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs)
zap.String("cost", duration.String()))

for k, v := range metrics {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k)}, ExporterSigNozSentLogRecords.M(int64(v.Count)), ExporterSigNozSentLogRecordsBytes.M(int64(v.Size)))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k)}, ExporterSentLogRecords.M(int64(v.Count)), ExporterSentLogRecordsBytes.M(int64(v.Size)))
}

// push tag attributes
Expand All @@ -239,7 +233,7 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs)
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeLogs)),
tag.Upsert(tableKey, DISTRIBUTED_TAG_ATTRIBUTES),
tag.Upsert(tableKey, DefaultLogTagAttributes),
},
writeLatencyMillis.M(int64(time.Since(tagWriteStart).Milliseconds())),
)
Expand Down Expand Up @@ -440,5 +434,5 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro
}

func renderInsertLogsSQL(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE)
return fmt.Sprintf(insertLogsSQLTemplate, DefaultLogDatabase, DefaultLogsTable)
}
2 changes: 0 additions & 2 deletions otel-collector/exporter/clickhouselogsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ const (
typeStr = "clickhouselogsexporter"
primaryNamespace = "clickhouselogs"
archiveNamespace = "clickhouselogs-archive"
databaseName = "signoz_logs"
tableName = "logs"
migrationsFolder = "./migrations"
)

Expand Down
8 changes: 8 additions & 0 deletions otel-collector/exporter/clickhouselogsexporter/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package clickhouselogsexporter

const (
DefaultLogDatabase string = "datav_logs"
DefaultLogDatasource string = "tcp://127.0.0.1:9000/?database=datav_logs"
DefaultLogsTable string = "distributed_logs"
DefaultLogTagAttributes string = "distributed_log_tag_attributes"
)
32 changes: 16 additions & 16 deletions otel-collector/exporter/clickhouselogsexporter/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,33 @@ import (
)

const (
SigNozSentLogRecordsKey = "singoz_sent_log_records"
SigNozSentLogRecordsBytesKey = "singoz_sent_log_records_bytes"
SentLogRecordsKey = "singoz_sent_log_records"
SentLogRecordsBytesKey = "singoz_sent_log_records_bytes"
)

var (
// Measures for usage
ExporterSigNozSentLogRecords = stats.Int64(
SigNozSentLogRecordsKey,
"Number of signoz log records successfully sent to destination.",
ExporterSentLogRecords = stats.Int64(
SentLogRecordsKey,
"Number of datav log records successfully sent to destination.",
stats.UnitDimensionless)
ExporterSigNozSentLogRecordsBytes = stats.Int64(
SigNozSentLogRecordsBytesKey,
"Total size of signoz log records successfully sent to destination.",
ExporterSentLogRecordsBytes = stats.Int64(
SentLogRecordsBytesKey,
"Total size of datav log records successfully sent to destination.",
stats.UnitDimensionless)

// Views for usage
LogsCountView = &view.View{
Name: "signoz_logs_count",
Measure: ExporterSigNozSentLogRecords,
Description: "The number of logs exported to signoz",
Name: "datav_logs_count",
Measure: ExporterSentLogRecords,
Description: "The number of logs exported to datav",
Aggregation: view.Sum(),
TagKeys: []tag.Key{usage.TagTenantKey},
}
LogsSizeView = &view.View{
Name: "signoz_logs_bytes",
Measure: ExporterSigNozSentLogRecordsBytes,
Description: "The size of logs exported to signoz",
Name: "datav_logs_bytes",
Measure: ExporterSentLogRecordsBytes,
Description: "The size of logs exported to datav",
Aggregation: view.Sum(),
TagKeys: []tag.Key{usage.TagTenantKey},
}
Expand All @@ -46,7 +46,7 @@ var (
func UsageExporter(metrics []*metricdata.Metric) (map[string]usage.Usage, error) {
data := map[string]usage.Usage{}
for _, metric := range metrics {
if strings.Contains(metric.Descriptor.Name, "signoz_logs_count") {
if strings.Contains(metric.Descriptor.Name, "datav_logs_count") {
for _, v := range metric.TimeSeries {
tenant := v.LabelValues[0].Value
if d, ok := data[tenant]; ok {
Expand All @@ -58,7 +58,7 @@ func UsageExporter(metrics []*metricdata.Metric) (map[string]usage.Usage, error)
}
}
}
} else if strings.Contains(metric.Descriptor.Name, "signoz_logs_bytes") {
} else if strings.Contains(metric.Descriptor.Name, "datav_logs_bytes") {
for _, v := range metric.TimeSeries {
tenant := v.LabelValues[0].Value
if d, ok := data[tenant]; ok {
Expand Down
7 changes: 0 additions & 7 deletions otel-collector/exporter/clickhousetracesexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +0,0 @@
Archive storage?
hold/cold to clickhouse? Use of S3 for cold?

support other configs/flags?
Is Operations Table needed?

cat ./opentelemetry-collector/exporter/clickhouseexporter/sql-schema/signoz-index.sql | sudo ./clickhouse client -h 18.220.17.59 -mn
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) {

configClickHouse := cfg.(*Config)

f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasource, configClickHouse.DockerMultiNodeCluster)
f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.DSN, configClickHouse.DockerMultiNodeCluster)

err := f.Initialize(logger)
if err != nil {
Expand All @@ -55,7 +55,7 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) {
collector := usage.NewUsageCollector(
f.db,
usage.Options{ReportingInterval: usage.DefaultCollectionInterval},
"signoz_traces",
"datav_traces",
UsageExporter,
)
if err != nil {
Expand Down
18 changes: 14 additions & 4 deletions otel-collector/exporter/clickhousetracesexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package clickhousetracesexporter

import (
"errors"

"go.opentelemetry.io/collector/component"
"go.uber.org/multierr"
)

// Config defines configuration for tracing exporter.
type Config struct {
Options `mapstructure:",squash"`
Datasource string `mapstructure:"datasource"`
DSN string `mapstructure:"dsn"`
Migrations string `mapstructure:"migrations"`
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster"`
Expand All @@ -31,7 +34,14 @@ type Config struct {

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
return nil
var (
errConfigNoDSN = errors.New("dsn must be specified")
)

// Validate validates the clickhouse server configuration.
func (cfg *Config) Validate() (err error) {
if cfg.DSN == "" {
err = multierr.Append(err, errConfigNoDSN)
}
return err
}
Loading

0 comments on commit f1de7e8

Please sign in to comment.