Skip to content

Commit

Permalink
Merge branch 'master' into tomasz/implement-suspend-resume
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts authored Dec 5, 2023
2 parents b2369aa + c00829b commit d481f9b
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 55 deletions.
80 changes: 55 additions & 25 deletions supervisor_types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package druid

import "time"

// InputIngestionSpec is the root-level type defining an ingestion spec used
// by Apache Druid.
type InputIngestionSpec struct {
Expand Down Expand Up @@ -106,7 +104,7 @@ type FieldList []Field
type Field struct {
Type string `json:"type"`
Name string `json:"name"`
Expr string `json:"expression"`
Expr string `json:"expr"`
}

// Transform defines a single filed transformation of the TransformSpec.
Expand All @@ -126,17 +124,15 @@ type SpatialDimension struct {
// TransformSet is a unique set of transforms applied to the input.
type TransformSet []Transform

// Dimension represents druid dimension.
// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []any

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Type string `json:"type,omitempty"`
Name string `json:"name,omitempty"`
MultiValueHandling string `json:"multiValueHandling,omitempty"`
CreateBitmapIndex bool `json:"createBitmapIndex,omitempty"`
Type string `json:"type"`
Name string `json:"name"`
}

// DimensionsSet is a unique set of druid datasource dimensions(labels).
type DimensionsSet []Dimension

// SpatialDimensionSet is a unique set of druid datasource spatial dimensions.
type SpatialDimensionSet []SpatialDimension

Expand Down Expand Up @@ -313,6 +309,19 @@ type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database.
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
}

// Database configuration for InputSource "sql".
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
}

// InputSource is a specification of the storage system where input data is stored.
type InputSource struct {
Type string `json:"type"`
Expand All @@ -333,6 +342,10 @@ type InputSource struct {

// CombiningInputSource fields
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}

// TransformSpec is responsible for transforming druid input data
Expand All @@ -357,7 +370,7 @@ type SupervisorStatusPayload struct {
// with the response metadata.
type SupervisorStatus struct {
SupervisorId string `json:"id"`
GenerationTime time.Time `json:"generationTime"`
GenerationTime string `json:"generationTime"`
Payload *SupervisorStatusPayload `json:"payload"`
}

Expand Down Expand Up @@ -387,20 +400,10 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec {
Format: "auto",
},
TransformSpec: &TransformSpec{
Transforms: []Transform{
{
Type: "expression",
Name: "payload",
Expr: "parse_json(payload)",
},
},
Transforms: []Transform{},
},
DimensionsSpec: &DimensionsSpec{
Dimensions: DimensionsSet{
{Name: "id"},
{Name: "ts"},
{Name: "payload"},
},
Dimensions: DimensionSet{},
},
GranularitySpec: &GranularitySpec{
Type: "uniform",
Expand All @@ -415,7 +418,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec {
InputFormat: &InputFormat{
Type: "json",
},
TaskDuration: "PT30M",
TaskDuration: "PT1H",
ConsumerProperties: &ConsumerProperties{
BootstrapServers: "",
},
Expand Down Expand Up @@ -450,6 +453,15 @@ func SetType(stype string) IngestionSpecOptions {
}
}

// SetIOConfigType sets the type of the supervisor IOConfig.
func SetIOConfigType(ioctype string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if ioctype != "" {
spec.IOConfig.Type = ioctype
}
}
}

// SetTopic sets the Kafka topic to consume data from.
func SetTopic(topic string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
Expand Down Expand Up @@ -541,3 +553,21 @@ func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranula
}
}
}

// SetSQLInputSource configures sql input source.
func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.InputSource = &InputSource{
Type: "sql",
SQLs: sqls,
Database: &Database{
Type: dbType,
ConnectorConfig: &ConnectorConfig{
ConnectURI: connectURI,
User: user,
Password: password,
},
},
}
}
}
199 changes: 169 additions & 30 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestKafkaIngestionSpec(t *testing.T) {
Expand Down Expand Up @@ -34,11 +35,11 @@ func TestKafkaIngestionSpec(t *testing.T) {
{
name: "set labels",
options: []IngestionSpecOptions{
SetDimensions([]Dimension{{Name: "ts"}, {Name: "user_name"}, {Name: "payload"}}),
SetDimensions([]any{"ts", "user_name", "payload"}),
},
expected: func() *InputIngestionSpec {
out := defaultKafkaIngestionSpec()
out.DataSchema.DimensionsSpec.Dimensions = []Dimension{{Name: "ts"}, {Name: "user_name"}, {Name: "payload"}}
out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"}
return out
}(),
},
Expand All @@ -63,13 +64,7 @@ var jsonBasic = `{
"format": "auto"
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "payload",
"expression": "parse_json(payload)"
}
]
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
Expand All @@ -89,7 +84,7 @@ var jsonBasic = `{
"consumerProperties": {
"bootstrap.servers": "test_brokers"
},
"taskDuration": "PT30M",
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
Expand All @@ -101,25 +96,169 @@ var jsonBasic = `{
}`

func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
t.Run("jsonBasic", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]Dimension{{Name: "ts"}, {Name: "user_name"}, {Name: "payload"}}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
assert.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

var jsonWithTypedDimensions = `{
"type": "kafka",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
{
"type": "string",
"name": "ts"
},
{
"type": "json",
"name": "payload"
}
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
"ioConfig": {
"topic": "test_topic",
"consumerProperties": {
"bootstrap.servers": "test_brokers"
},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
}

var jsonWithSqlInputSource = `{
"type": "index_parallel",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
"ts",
"user_name",
"payload"
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "sql",
"sqls": [
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"
],
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "username",
"password": "password"
}
}
},
"consumerProperties": {},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
assert.Equal(t, spec, checkSpec)
})
func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
spec := NewIngestionSpec(
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetSQLInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
"password",
[]string{
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

0 comments on commit d481f9b

Please sign in to comment.