Skip to content

Commit

Permalink
add detailed dimension specification (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts authored Oct 6, 2023
1 parent 5019c6c commit b0fa8ee
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 9 deletions.
12 changes: 8 additions & 4 deletions supervisor_types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package druid

import "time"

// InputIngestionSpec is the root-level type defining an ingestion spec used
// by Apache Druid.
type InputIngestionSpec struct {
Expand Down Expand Up @@ -127,7 +125,13 @@ type SpatialDimension struct {
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []string
type DimensionSet []any

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Type string `json:"type"`
Name string `json:"name"`
}

// SpatialDimensionSet is a unique set of druid datasource spatial dimensions.
type SpatialDimensionSet []SpatialDimension
Expand Down Expand Up @@ -341,7 +345,7 @@ type SupervisorStatusPayload struct {
// with the response metadata.
type SupervisorStatus struct {
SupervisorId string `json:"id"`
GenerationTime time.Time `json:"generationTime"`
GenerationTime string `json:"generationTime"`
Payload *SupervisorStatusPayload `json:"payload"`
}

Expand Down
76 changes: 71 additions & 5 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestKafkaIngestionSpec(t *testing.T) {
Expand Down Expand Up @@ -34,11 +35,11 @@ func TestKafkaIngestionSpec(t *testing.T) {
{
name: "set labels",
options: []IngestionSpecOptions{
SetDimensions([]string{"ts", "user_name", "payload"}),
SetDimensions([]any{"ts", "user_name", "payload"}),
},
expected: func() *InputIngestionSpec {
out := defaultKafkaIngestionSpec()
out.DataSchema.DimensionsSpec.Dimensions = []string{"ts", "user_name", "payload"}
out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"}
return out
}(),
},
Expand Down Expand Up @@ -100,20 +101,85 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]string{"ts", "user_name", "payload"}),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
assert.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
assert.Equal(t, spec, checkSpec)
require.Equal(t, spec, checkSpec)
})
}

var jsonWithTypedDimensions = `{
"type": "kafka",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
{
"type": "string",
"name": "ts"
},
{
"type": "json",
"name": "payload"
}
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
"ioConfig": {
"topic": "test_topic",
"consumerProperties": {
"bootstrap.servers": "test_brokers"
},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
t.Run("jsonWithTypedDimensions", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
})
}

0 comments on commit b0fa8ee

Please sign in to comment.