Skip to content

Commit

Permalink
address CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Dec 11, 2023
1 parent 51b3ce0 commit bc46dc3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 47 deletions.
38 changes: 19 additions & 19 deletions common_spec_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type BitmapFactory struct {
// StringEncodingStrategy type for specifying string encoding at indexing stage.
type StringEncodingStrategy struct {
Type string `json:"type"`
// FrontCoded fields
// FrontCoded fields.
BucketSize int `json:"bucketSize,omitempty"`
FormatVersion int `json:"formatVersion,omitempty"`
}
Expand Down Expand Up @@ -180,12 +180,12 @@ type IdleConfig struct {
type Firehose struct {
Type string `json:"type,omitempty"`

// EventReceiverFirehoseFactory fields
// EventReceiverFirehoseFactory fields.
ServiceName string `json:"serviceName,omitempty"`
BufferSize int `json:"bufferSize,omitempty"`
MaxIdleTime int64 `json:"maxIdleTime,omitempty"`

// FixedCountFirehoseFactory / ClippedFirehoseFactory / TimedShutoffFirehoseFactory fields
// FixedCountFirehoseFactory / ClippedFirehoseFactory / TimedShutoffFirehoseFactory fields.
Delegate []Firehose `json:"delegate,omitempty"`
Count int `json:"count,omitempty"`
Interval string `json:"interval,omitempty"`
Expand All @@ -195,10 +195,10 @@ type Firehose struct {
// CompactionInputSpec is a specification for compaction task.
type CompactionInputSpec struct {
Type string `json:"type"`
// CompactionIntervalSpec fields
// CompactionIntervalSpec fields.
Interval string `json:"interval,omitempty"`
Sha256OfSortedSegmentIds string `json:"sha256OfSortedSegmentIds,omitempty"`
// SpecificSegmentsSpec fields
// SpecificSegmentsSpec fields.
Segments []string `json:"segments,omitempty"`
}

Expand All @@ -225,20 +225,20 @@ type IOConfig struct {
// IndexIOConfig field
InputSource *InputSource `json:"inputSource,omitempty"`
AppendToExisting bool `json:"appendToExisting,omitempty"`
// IndexIOConfig / CompactionIOConfig shared field
// IndexIOConfig / CompactionIOConfig shared fields.
DropExisting bool `json:"dropExisting,omitempty"`

// CompactionIOConfig / HadoopIOConfig fields
// CompactionIOConfig / HadoopIOConfig fields.
InputSpec map[string]any `json:"inputSpec,omitempty"`

// CompactionIOConfig field
// CompactionIOConfig fields.
AllowNonAlignedInterval bool `json:"allowNonAlignedInterval,omitempty"`

// HadoopIOConfig fields
// HadoopIOConfig fields.
MetadataUpdateSpec *MetadataStorageUpdaterJobSpec `json:"metadataUpdateSpec,omitempty"`
SegmentOutputPath string `json:"segmentOutputPath,omitempty"`

// KafkaIndexTaskIOConfig / KinesisIndexTaskIOConfig fields
// KafkaIndexTaskIOConfig / KinesisIndexTaskIOConfig fields.
Topic string `json:"topic,omitempty"`
ConsumerProperties *ConsumerProperties `json:"consumerProperties,omitempty"`
TaskDuration string `json:"taskDuration,omitempty"`
Expand All @@ -255,7 +255,7 @@ type IOConfig struct {
Stream string `json:"stream,omitempty"`
UseEarliestSequenceNumber bool `json:"useEarliestSequenceNumber,omitempty"`

// common fields
// Common fields.
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
InputFormat *InputFormat `json:"inputFormat,omitempty"`
IdleConfig *IdleConfig `json:"idleConfig,omitempty"`
Expand All @@ -273,18 +273,18 @@ type ConsumerProperties struct {
type InputFormat struct {
Type string `json:"type"`

// FlatTextInputFormat / DelimitedInputFormat fields
// FlatTextInputFormat / DelimitedInputFormat fields.
Delimiter string `json:"delimiter,omitempty"`
ListDelimiter string `json:"listDelimiter,omitempty"`
FindColumnsFromHeader string `json:"findColumnsFromHeader,omitempty"`
SkipHeaderRows int `json:"skipHeaderRows,omitempty"`
Columns []string `json:"columns,omitempty"`

// JsonInputFormat fields
// JsonInputFormat fields.
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
FeatureSpec map[string]bool `json:"featureSpec,omitempty"`

// Common CsvInputFormat / JsonInputFormat fields
// Common CsvInputFormat / JsonInputFormat fields.
KeepNullColumns bool `json:"keepNullColumns,omitempty"`
AssumeNewlineDelimited bool `json:"assumeNewlineDelimited,omitempty"`
UseJsonNodeReader bool `json:"useJsonNodeReader,omitempty"`
Expand Down Expand Up @@ -312,24 +312,24 @@ type Database struct {
type InputSource struct {
Type string `json:"type"`

// LocalInputSource fields
// LocalInputSource fields.
BaseDir string `json:"baseDir,omitempty"`
Filter string `json:"filter,omitempty"`
Files []string `json:"files,omitempty"`

// HttpInputSource fields
// HttpInputSource fields.
URIs []string `json:"uris,omitempty"`
HttpAuthenticationUsername string `json:"httpAuthenticationUsername,omitempty"`
HttpAuthenticationPassword string `json:"httpAuthenticationPassword,omitempty"`
HttpSourceConfig *HttpInputSourceConfig `json:"config,omitempty"`

// InlineInputSource fields
// InlineInputSource fields.
Data string `json:"data,omitempty"`

// CombiningInputSource fields
// CombiningInputSource fields.
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
// SqlInputSource.
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}
Expand Down
79 changes: 51 additions & 28 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"github.com/testcontainers/testcontainers-go/wait"
)

// TestDO represents entry with payload.
type TestDO struct {
// testDO represents entry with payload.
type testDO struct {
Timestamp time.Time `db:"ts"`
Id uuid.UUID `db:"id"`
Payload types.JSONText `db:"payload"`
}

var testObjects = []TestDO{
var testObjects = []testDO{
{
Id: uuid.New(),
Timestamp: time.Now(),
Expand All @@ -35,7 +35,7 @@ var testObjects = []TestDO{
},
}

func triggerIngestionTask(d *Client, dataSourceName string, entries []TestDO) (string, error) {
func triggerIngestionTask(d *Client, dataSourceName string, entries []testDO) (string, error) {
csvEntriesBuff := &bytes.Buffer{}

err := gocsv.MarshalWithoutHeaders(entries, csvEntriesBuff)
Expand All @@ -55,42 +55,66 @@ func triggerIngestionTask(d *Client, dataSourceName string, entries []TestDO) (s
return taskID, err
}

func awaitTaskCompletion(client *Client, taskID string) error {
for range time.Tick(100 * time.Millisecond) {
res, err := client.Tasks().GetStatus(taskID)
if err != nil {
return err
}
type testFunction func() error

if res.Status.Status == "RUNNING" {
continue
func awaitTaskCompletion(client *Client, taskID string, durationSeconds int, tickDurationMilliseconds time.Duration) error {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(time.Duration(durationSeconds) * time.Second)
cancel()
}()
ticker := time.NewTicker(tickDurationMilliseconds * time.Millisecond)
for {
select {
case <-ticker.C:
res, err := client.Tasks().GetStatus(taskID)
if err != nil {
return err
}

if res.Status.Status == "RUNNING" {
continue
}
break
case <-ctx.Done():
return errors.New("awaitTaskCompletion timeout")
}
break
}
return nil

Check failure on line 83 in tasks_test.go

View workflow job for this annotation

GitHub Actions / Lint

unreachable code
}

func awaitTaskRunning(client *Client, taskID string) error {
for range time.Tick(100 * time.Millisecond) {
res, err := client.Tasks().GetStatus(taskID)
if err != nil {
return err
}

if res.Status.Status == "RUNNING" {
return nil
func awaitTaskRunning(client *Client, taskID string, durationSeconds int, tickDurationMilliseconds time.Duration) error {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(time.Duration(durationSeconds) * time.Second)
cancel()
}()
ticker := time.NewTicker(tickDurationMilliseconds * time.Millisecond)
for {
select {
case <-ticker.C:
res, err := client.Tasks().GetStatus(taskID)
if err != nil {
return err
}

if res.Status.Status == "RUNNING" {
return nil
}
case <-ctx.Done():
return errors.New("awaitTaskCompletion timeout")
}
}
return errors.New("task has not started")

Check failure on line 108 in tasks_test.go

View workflow job for this annotation

GitHub Actions / Lint

unreachable code
}

func runInlineIngestionTask(client *Client, dataSourceName string, entries []TestDO, recordsCount int) error {
func runInlineIngestionTask(client *Client, dataSourceName string, entries []testDO, recordsCount int) error {
taskID, err := triggerIngestionTask(client, dataSourceName, entries)
if err != nil {
return err
}

err = awaitTaskCompletion(client, taskID)
err = awaitTaskCompletion(client, taskID, 180, 500)
if err != nil {
return err
}
Expand Down Expand Up @@ -119,7 +143,7 @@ func TestTaskService(t *testing.T) {
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
defer cancel()

// Set up druid service and client.
var druidOpts []ClientOption
Expand Down Expand Up @@ -154,8 +178,7 @@ func TestTerminateTask(t *testing.T) {
t.Cleanup(cancel)

// Set up druid service and client.
var druidOpts []ClientOption
d, err := NewClient("http://localhost:8888", druidOpts...)
d, err := NewClient("http://localhost:8888")
require.NoError(t, err, "error should be nil")

// Waiting for druid services to start.
Expand All @@ -171,7 +194,7 @@ func TestTerminateTask(t *testing.T) {
taskID, err := triggerIngestionTask(d, "test-terminate-task-datasource", testObjects)
require.NoError(t, err, "error should be nil")

err = awaitTaskRunning(d, taskID)
err = awaitTaskRunning(d, taskID, 180, 500)
require.NoError(t, err, "error should be nil")

shutdownTaskID, err := d.Tasks().Shutdown(taskID)
Expand Down

0 comments on commit bc46dc3

Please sign in to comment.