Skip to content

Commit

Permalink
update dimension spec fields and add ingestion spec options helpers (#6)
Browse files Browse the repository at this point in the history
* update dimension spec fields and add ingestion spec options helpers
  • Loading branch information
vzayts authored Oct 3, 2023
1 parent 5ce3b3a commit b1ed025
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
53 changes: 49 additions & 4 deletions supervisor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,32 @@ type Transform struct {
Expr string `json:"expression"`
}

// SpatialDimension represents single spatial dimension datum.
// https://druid.apache.org/docs/latest/querying/geo/#spatial-indexing
type SpatialDimension struct {
DimensionName string `json:"dimName"`
Dimensions []string `json:"dims,omitempty"`
}

// TransformSet is a unique set of transforms applied to the input.
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []string

// SpatialDimensionSet is a unique set of druid datasource spatial dimensions.
type SpatialDimensionSet []SpatialDimension

// DimensionsSpec is responsible for configuring Druid's dimensions. They're a
// set of columns in Druid's data model that can be used for grouping, filtering
// or applying aggregations.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec
type DimensionsSpec struct {
Dimensions DimensionSet `json:"dimensions"`
Dimensions DimensionSet `json:"dimensions,omitempty"`
DimansionExclusions DimensionSet `json:"dimansionExclusions,omitempty"`
SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"`
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
}

// GranularitySpec allows for configuring operations such as data segment
Expand Down Expand Up @@ -207,7 +221,7 @@ type MetadataStorageUpdaterJobSpec struct {
// IOConfig influences how data is read into Druid from a source system.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#ioconfig
type IOConfig struct {
Type string `json:"type"`
Type string `json:"type,omitempty"`

// IndexIOConfig / RealtimeIOConfig shared field
Firehose *Firehose `json:"firehose,omitempty"`
Expand Down Expand Up @@ -380,7 +394,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec {
},
},
IOConfig: &IOConfig{
Type: "kafka",
Type: "",
Topic: "",
InputFormat: &InputFormat{
Type: "json",
Expand Down Expand Up @@ -415,7 +429,7 @@ type IngestionSpecOptions func(*InputIngestionSpec)
func SetType(stype string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if stype != "" {
spec.IOConfig.Type = stype
spec.Type = stype
}
}
}
Expand Down Expand Up @@ -473,10 +487,41 @@ func SetDimensions(dimensions DimensionSet) IngestionSpecOptions {
}
}

// SetDimensionsAutodiscovery sets druid autodiscovery for datasource dimensions.
func SetDimensionsAutodiscovery(discover bool) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.DataSchema.DimensionsSpec.UseSchemaDiscovery = discover
}
}

// SetUseEarliestOffset configures kafka druid ingestion supervisor to start reading
// from the earliest or latest offsets in Kafka.
func SetUseEarliestOffset(useEarliestOffset bool) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.UseEarliestOffset = useEarliestOffset
}
}

// SetTimestampColumn sets timestamp column for the druid datasource.
func SetTimestampColumn(column string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if column != "" {
spec.DataSchema.TimeStampSpec = &TimestampSpec{
Column: column,
Format: "auto",
}
}
}
}

// SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage.
func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.DataSchema.GranularitySpec = &GranularitySpec{
Type: "uniform",
SegmentGranularity: segmentGranularity,
QueryGranularity: queryGranularity,
Rollup: rollup,
}
}
}
1 change: 0 additions & 1 deletion supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ var jsonBasic = `{
}
},
"ioConfig": {
"type": "kafka",
"topic": "test_topic",
"consumerProperties": {
"bootstrap.servers": "test_brokers"
Expand Down

0 comments on commit b1ed025

Please sign in to comment.