Skip to content

Commit

Permalink
Merge pull request #13 from h2oai/tomasz/options-to-submit-reindex-task
Browse files Browse the repository at this point in the history
Added TaskIngestionSpecOptions functions to define druid reindexing task
  • Loading branch information
tomasz-h2o authored Jan 15, 2024
2 parents c263cd7 + 75513a5 commit aaa0d22
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 9 deletions.
56 changes: 47 additions & 9 deletions task_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package druid

import (
"fmt"
"time"
)

const (
iso8601Format = "2006-01-02T15:04:05"
)

// TaskStatusResponse is a response object containing status of a task.
type TaskStatusResponse struct {
Task string `json:"task"`
Expand Down Expand Up @@ -55,17 +64,12 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
TransformSpec: &TransformSpec{
Transforms: []Transform{},
},
TimeStampSpec: &TimestampSpec{},
},
IOConfig: &IOConfig{
Type: "index_parallel",
InputSource: &InputSource{
Type: "sql",
Database: &Database{},
SQLs: []string{},
},
InputFormat: &InputFormat{
Type: "json",
},
Type: "index_parallel",
InputSource: &InputSource{},
InputFormat: &InputFormat{},
},
TuningConfig: &TuningConfig{
Type: "index_parallel",
Expand Down Expand Up @@ -170,6 +174,40 @@ func SetTaskInlineInputData(data string) TaskIngestionSpecOptions {
}
}

// SetTaskDruidInputSource configures druid reindex input source for the task based ingestion.
func SetTaskDruidInputSource(datasource string, startTime time.Time, endTime time.Time) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.IOConfig.InputSource = &InputSource{
Type: "druid",
Datasource: datasource,
Interval: fmt.Sprintf(
"%s/%s",
startTime.Format(iso8601Format),
endTime.Format(iso8601Format),
),
}
}
}

// SetTaskSchemaDiscovery sets auto discovery of dimensions.
func SetTaskSchemaDiscovery(discovery bool) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.DataSchema.DimensionsSpec.UseSchemaDiscovery = discovery
}
}

// SetTaskGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage.
func SetTaskGranularitySpec(segmentGranularity string, queryGranularity *QueryGranularitySpec, rollup bool) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.DataSchema.GranularitySpec = &GranularitySpec{
Type: "uniform",
SegmentGranularity: segmentGranularity,
QueryGranularity: queryGranularity,
Rollup: rollup,
}
}
}

// NewTaskIngestionSpec returns a default TaskIngestionSpec and applies any options passed to it.
func NewTaskIngestionSpec(options ...TaskIngestionSpecOptions) *TaskIngestionSpec {
spec := defaultTaskIngestionSpec()
Expand Down
57 changes: 57 additions & 0 deletions task_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package druid

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTaskIngestionSpec(t *testing.T) {
var testData = []struct {
name string
options []TaskIngestionSpecOptions
expected *TaskIngestionSpec
}{
{
name: "druid reindex ingestion task",
options: []TaskIngestionSpecOptions{
SetTaskDataSource("telemetry-test"),
SetTaskDataDimensions(DimensionSet{{"id"}, {"kind"}, {"test"}}),
SetTaskDruidInputSource(
"telemetry-test",
time.Date(2023, 12, 12, 0, 0, 0, 0, time.UTC),
time.Date(2023, 12, 24, 0, 0, 0, 0, time.UTC),
),
SetTaskSchemaDiscovery(false),
SetTaskTimestampColumn("__time"),
SetTaskGranularitySpec("DAY", &QueryGranularitySpec{"none"}, true),
},
expected: func() *TaskIngestionSpec {
out := NewTaskIngestionSpec()
out.Spec.DataSchema.DataSource = "telemetry-test"
out.Spec.DataSchema.DimensionsSpec = &DimensionsSpec{
Dimensions: DimensionSet{{"id"}, {"kind"}, {"test"}},
}
out.Spec.IOConfig.InputSource.Type = "druid"
out.Spec.IOConfig.InputSource.Datasource = "telemetry-test"
out.Spec.IOConfig.InputSource.Interval = "2023-12-12T00:00:00/2023-12-24T00:00:00"
out.Spec.DataSchema.TimeStampSpec.Column = "__time"
out.Spec.DataSchema.TimeStampSpec.Format = "auto"
out.Spec.DataSchema.GranularitySpec.SegmentGranularity = "DAY"
out.Spec.DataSchema.GranularitySpec.QueryGranularity = &QueryGranularitySpec{"none"}
out.Spec.DataSchema.GranularitySpec.Rollup = true
return out
}(),
},
}

for _, test := range testData {
t.Run(test.name, func(t *testing.T) {
actual := NewTaskIngestionSpec(
test.options...,
)
assert.Equal(t, test.expected, actual)
})
}
}

0 comments on commit aaa0d22

Please sign in to comment.