From 5019c6c32698bcbcf951d0e806a71f174821cd79 Mon Sep 17 00:00:00 2001 From: Victor Zaytsev <94850767+vzaytsev1981@users.noreply.github.com> Date: Thu, 5 Oct 2023 11:34:16 -0400 Subject: [PATCH 1/3] fix flattenSpec field names (#8) --- supervisor_types.go | 18 ++++-------------- supervisor_types_test.go | 10 ++-------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 8fe2ae6..c0c41c7 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -106,7 +106,7 @@ type FieldList []Field type Field struct { Type string `json:"type"` Name string `json:"name"` - Expr string `json:"expression"` + Expr string `json:"expr"` } // Transform defines a single filed transformation of the TransformSpec. @@ -371,20 +371,10 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { Format: "auto", }, TransformSpec: &TransformSpec{ - Transforms: []Transform{ - { - Type: "expression", - Name: "payload", - Expr: "parse_json(payload)", - }, - }, + Transforms: []Transform{}, }, DimensionsSpec: &DimensionsSpec{ - Dimensions: DimensionSet{ - "id", - "ts", - "payload", - }, + Dimensions: DimensionSet{}, }, GranularitySpec: &GranularitySpec{ Type: "uniform", @@ -399,7 +389,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { InputFormat: &InputFormat{ Type: "json", }, - TaskDuration: "PT30M", + TaskDuration: "PT1H", ConsumerProperties: &ConsumerProperties{ BootstrapServers: "", }, diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 759d540..010db16 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -63,13 +63,7 @@ var jsonBasic = `{ "format": "auto" }, "transformSpec": { - "transforms": [ - { - "type": "expression", - "name": "payload", - "expression": "parse_json(payload)" - } - ] + "transforms": [] }, "dimensionsSpec": { "dimensions": [ @@ -89,7 +83,7 @@ var jsonBasic = `{ "consumerProperties": { "bootstrap.servers": "test_brokers" }, - "taskDuration": "PT30M", + "taskDuration": "PT1H", "useEarliestOffset": false, "flattenSpec": { "fields": [] From b0fa8ee2f631c24823cde0093b9c783f105763f1 Mon Sep 17 00:00:00 2001 From: Victor Zaytsev <94850767+vzaytsev1981@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:47:28 -0400 Subject: [PATCH 2/3] add detailed dimension specification (#9) --- supervisor_types.go | 12 ++++--- supervisor_types_test.go | 76 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 9 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index c0c41c7..576b571 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -1,7 +1,5 @@ package druid -import "time" - // InputIngestionSpec is the root-level type defining an ingestion spec used // by Apache Druid. type InputIngestionSpec struct { @@ -127,7 +125,13 @@ type SpatialDimension struct { type TransformSet []Transform // DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []string +type DimensionSet []any + +// Dimension is a typed definition of a datasource dimension. +type Dimension struct { + Type string `json:"type"` + Name string `json:"name"` +} // SpatialDimensionSet is a unique set of druid datasource spatial dimensions. type SpatialDimensionSet []SpatialDimension @@ -341,7 +345,7 @@ type SupervisorStatusPayload struct { // with the response metadata. type SupervisorStatus struct { SupervisorId string `json:"id"` - GenerationTime time.Time `json:"generationTime"` + GenerationTime string `json:"generationTime"` Payload *SupervisorStatusPayload `json:"payload"` } diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 010db16..360d837 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestKafkaIngestionSpec(t *testing.T) { @@ -34,11 +35,11 @@ func TestKafkaIngestionSpec(t *testing.T) { { name: "set labels", options: []IngestionSpecOptions{ - SetDimensions([]string{"ts", "user_name", "payload"}), + SetDimensions([]any{"ts", "user_name", "payload"}), }, expected: func() *InputIngestionSpec { out := defaultKafkaIngestionSpec() - out.DataSchema.DimensionsSpec.Dimensions = []string{"ts", "user_name", "payload"} + out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"} return out }(), }, @@ -100,20 +101,85 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]string{"ts", "user_name", "payload"}), + SetDimensions([]any{"ts", "user_name", "payload"}), ) actual, err := json.MarshalIndent(spec, "", " ") if err != nil { t.Fatalf("unexpected error while marshalling: %v", err) } expected := []byte(jsonBasic) - assert.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) var checkSpec *InputIngestionSpec err = json.Unmarshal(actual, &checkSpec) if err != nil { t.Fatalf("unexpected error while unmarshalling: %v", err) } - assert.Equal(t, spec, checkSpec) + require.Equal(t, spec, checkSpec) + }) +} + +var jsonWithTypedDimensions = `{ + "type": "kafka", + "dataSchema": { + "dataSource": "test_datasource", + "timestampSpec": { + "column": "ts", + "format": "auto" + }, + "transformSpec": { + "transforms": [] + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "ts" + }, + { + "type": "json", + "name": "payload" + } + ] + }, + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "none" + } + }, + "ioConfig": { + "topic": "test_topic", + "consumerProperties": { + "bootstrap.servers": "test_brokers" + }, + "taskDuration": "PT1H", + "useEarliestOffset": false, + "flattenSpec": { + "fields": [] + }, + "inputFormat": { + "type": "json" + } + } +}` + +func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { + t.Run("jsonWithTypedDimensions", func(t *testing.T) { + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{ + Dimension{Type: "string", Name: "ts"}, + Dimension{Type: "json", Name: "payload"}, + }), + ) + actual, err := json.MarshalIndent(spec, "", " ") + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithTypedDimensions) + require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) }) } From c00829bfc08c37042915f66334d09b8ca61785d5 Mon Sep 17 00:00:00 2001 From: Victor Zaytsev <94850767+vzaytsev1981@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:48:47 -0500 Subject: [PATCH 3/3] add sql input source config and relevant IngestionSpecOptions (#10) --- supervisor_types.go | 44 ++++++++++++ supervisor_types_test.go | 151 +++++++++++++++++++++++++++++---------- 2 files changed, 159 insertions(+), 36 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 576b571..d6fb624 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -302,6 +302,19 @@ type HttpInputSourceConfig struct { AllowedProtocols []string `json:" allowedProtocols,omitempty"` } +// ConnectorConfig is connection configuration for Database. +type ConnectorConfig struct { + ConnectURI string `json:"connectURI"` + User string `json:"user"` + Password string `json:"password"` +} + +// Database configuration for InputSource "sql". +type Database struct { + Type string `json:"type"` + ConnectorConfig *ConnectorConfig `json:"connectorConfig"` +} + // InputSource is a specification of the storage system where input data is stored. type InputSource struct { Type string `json:"type"` @@ -322,6 +335,10 @@ type InputSource struct { // CombiningInputSource fields Delegates []InputSource `json:"delegates,omitempty"` + + // SqlInputSource + SQLs []string `json:"sqls,omitempty"` + Database *Database `json:"database,omitempty"` } // TransformSpec is responsible for transforming druid input data @@ -428,6 +445,15 @@ func SetType(stype string) IngestionSpecOptions { } } +// SetIOConfigType sets the type of the supervisor IOConfig. +func SetIOConfigType(ioctype string) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + if ioctype != "" { + spec.IOConfig.Type = ioctype + } + } +} + // SetTopic sets the Kafka topic to consume data from. func SetTopic(topic string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { @@ -519,3 +545,21 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool } } } + +// SetSQLInputSource configures sql input source. +func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + spec.IOConfig.InputSource = &InputSource{ + Type: "sql", + SQLs: sqls, + Database: &Database{ + Type: dbType, + ConnectorConfig: &ConnectorConfig{ + ConnectURI: connectURI, + User: user, + Password: password, + }, + }, + } + } +} diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 360d837..80a9716 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -96,27 +96,25 @@ var jsonBasic = `{ }` func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { - t.Run("jsonBasic", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonBasic) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{"ts", "user_name", "payload"}), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonBasic) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - var checkSpec *InputIngestionSpec - err = json.Unmarshal(actual, &checkSpec) - if err != nil { - t.Fatalf("unexpected error while unmarshalling: %v", err) - } - require.Equal(t, spec, checkSpec) - }) + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) } var jsonWithTypedDimensions = `{ @@ -165,21 +163,102 @@ var jsonWithTypedDimensions = `{ }` func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { - t.Run("jsonWithTypedDimensions", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{ + Dimension{Type: "string", Name: "ts"}, + Dimension{Type: "json", Name: "payload"}, + }), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithTypedDimensions) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) +} + +var jsonWithSqlInputSource = `{ + "type": "index_parallel", + "dataSchema": { + "dataSource": "test_datasource", + "timestampSpec": { + "column": "ts", + "format": "auto" + }, + "transformSpec": { + "transforms": [] + }, + "dimensionsSpec": { + "dimensions": [ + "ts", + "user_name", + "payload" + ] + }, + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "none" + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "sql", + "sqls": [ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'" + ], + "database": { + "type": "mysql", + "connectorConfig": { + "connectURI": "jdbc:mysql://host:port/schema", + "user": "username", + "password": "password" + } + } + }, + "consumerProperties": {}, + "taskDuration": "PT1H", + "useEarliestOffset": false, + "flattenSpec": { + "fields": [] + }, + "inputFormat": { + "type": "json" + } + } +}` + +func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { + spec := NewIngestionSpec( + SetType("index_parallel"), + SetIOConfigType("index_parallel"), + SetDataSource("test_datasource"), + SetDimensions([]any{"ts", "user_name", "payload"}), + SetSQLInputSource("mysql", + "jdbc:mysql://host:port/schema", + "username", + "password", + []string{ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", }), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonWithTypedDimensions) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - }) + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithSqlInputSource) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) }