Skip to content

Commit

Permalink
Introduced GetRunningTasks endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasz-h2o committed Jan 15, 2024
1 parent dd3e4a6 commit c4d3b21
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 51 deletions.
24 changes: 21 additions & 3 deletions task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ type TaskIngestionSpec struct {
Spec *IngestionSpecData `json:"spec"`
}

// RunningTask defines running task returned by GetRunningTasks method.
// https://druid.apache.org/docs/latest/api-reference/tasks-api#sample-response-2
type RunningTask struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Datasource string `json:"dataSource"`
}

// RunningTasksOptions defines supported options which can be passed to GetRunningTasks method.
// https://druid.apache.org/docs/latest/api-reference/tasks-api#query-parameters-2
type RunningTasksOptions struct {
Datasource string `url:"datasource"`
Type string `url:"type"`
}

// defaultTaskIngestionSpec returns a default TaskIngestionSpec with basic ingestion
// specification fields initialized.
func defaultTaskIngestionSpec() *TaskIngestionSpec {
Expand Down Expand Up @@ -156,9 +172,11 @@ func SetTaskIOConfigType(typ string) TaskIngestionSpecOptions {
// SetTaskInputFormat configures input format for the task based ingestion.
func SetTaskInputFormat(typ string, findColumnsHeader string, columns []string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.IOConfig.InputFormat.Type = typ
spec.Spec.IOConfig.InputFormat.FindColumnsFromHeader = findColumnsHeader
spec.Spec.IOConfig.InputFormat.Columns = columns
spec.Spec.IOConfig.InputFormat = &InputFormat{
Type: typ,
FindColumnsFromHeader: findColumnsHeader,
Columns: columns,
}
}
}

Expand Down
20 changes: 19 additions & 1 deletion tasks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package druid

import "strings"
import (
"net/http"
"strings"
)

const (
tasksEndpoint = "druid/indexer/v1/tasks"
Expand Down Expand Up @@ -82,3 +85,18 @@ func (s *TasksService) Shutdown(taskId string) (string, error) {
func applyTaskId(input string, taskId string) string {
return strings.Replace(input, ":taskId", taskId, 1)
}

// GetRunningTasks calls druid task service's running tasks API.
// https://druid.apache.org/docs/latest/api-reference/tasks-api#get-an-array-of-running-tasks
func (s *TasksService) GetRunningTasks(options RunningTasksOptions) ([]*RunningTask, error) {
r, err := s.client.NewRequest(http.MethodGet, tasksRunningEndpoint, options)
var result []*RunningTask
if err != nil {
return nil, err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result, err
}
return result, nil
}
122 changes: 75 additions & 47 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/gocarina/gocsv"
"github.com/google/uuid"
"github.com/jmoiron/sqlx/types"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
tc "github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/wait"
)
Expand All @@ -22,6 +22,17 @@ type testDO struct {
Payload types.JSONText `db:"payload"`
}

type TasksServiceTestSuite struct {
suite.Suite

compose tc.ComposeStack
client *Client
}

func TestTaskService(t *testing.T) {
suite.Run(t, &TasksServiceTestSuite{})
}

var testObjects = []testDO{
{
Id: uuid.New(),
Expand Down Expand Up @@ -127,71 +138,88 @@ func runInlineIngestionTask[T any](client *Client, dataSourceName string, entrie
return nil
}

func TestTaskService(t *testing.T) {
// Set up druid containers using docker-compose.
compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml")
require.NoError(t, err, "NewDockerComposeAPI()")

// Set up cleanup for druid containers.
t.Cleanup(func() {
require.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()")
})
func (s *TasksServiceTestSuite) SetupSuite() {
var err error
s.compose, err = tc.NewDockerCompose("testdata/docker-compose.yaml")
s.Require().NoError(err, "NewDockerComposeAPI()")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set up druid service and client.
d, err := NewClient("http://localhost:8888")
require.NoError(t, err, "error should be nil")
s.client, err = NewClient("http://localhost:8888")
s.Require().NoError(err, "error should be nil")

// Waiting for druid services to start.
err = compose.
err = s.compose.
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("middlemanager", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8091/tcp").WithStartupTimeout(180*time.Second)).
Up(ctx, tc.Wait(true))
require.NoError(t, err, "druid services should be up with no error")

// Test create ingestion task -> get status -> complete sequence.
runInlineIngestionTask(d, "test-submit-task-datasource", testObjects, 2)
require.NoError(t, err, "error should be nil")
s.Require().NoError(err, "druid services should be up with no error")
}

func TestTerminateTask(t *testing.T) {
// Set up druid containers using docker-compose.
compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml")
require.NoError(t, err, "NewDockerComposeAPI()")
func (s *TasksServiceTestSuite) TearDownSuite() {
s.Require().NoError(
s.compose.Down(
context.Background(),
tc.RemoveOrphans(true),
tc.RemoveVolumes(true),
tc.RemoveImagesLocal),
"compose.Down()",
)
}

// Set up cleanup for druid containers.
t.Cleanup(func() {
require.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()")
})
func (s *TasksServiceTestSuite) TestSubmit() {
// Test create ingestion task -> get status -> complete sequence.
err := runInlineIngestionTask(s.client, "test-submit-task-datasource", testObjects, 2)
s.Require().NoError(err, "error should be nil")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (s *TasksServiceTestSuite) TestTerminate() {
// Test create ingestion task -> get status -> terminate sequence.
taskID, err := triggerIngestionTask(s.client, "test-terminate-task-datasource", testObjects)
s.Require().NoError(err, "error should be nil")

// Set up druid service and client.
d, err := NewClient("http://localhost:8888")
require.NoError(t, err, "error should be nil")
err = awaitTaskStatus(s.client, taskID, "RUNNING", 180*time.Second, 200*time.Millisecond)
s.Require().NoError(err, "error should be nil")

// Waiting for druid services to start.
err = compose.
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("middlemanager", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8091/tcp").WithStartupTimeout(180*time.Second)).
Up(ctx, tc.Wait(true))
require.NoError(t, err, "druid services should be up with no error")
shutdownTaskID, err := s.client.Tasks().Shutdown(taskID)
s.Require().NoError(err, "error should be nil")
s.Require().Equal(shutdownTaskID, taskID)
}

// Test create ingestion task -> get status -> terminate sequence.
taskID, err := triggerIngestionTask(d, "test-terminate-task-datasource", testObjects)
require.NoError(t, err, "error should be nil")
func (s *TasksServiceTestSuite) TestGetRunningTasks() {
tasks, err := s.client.Tasks().GetRunningTasks(RunningTasksOptions{
Datasource: "test-get-tasks-datasource",
Type: "index_parallel",
})
s.Require().NoError(err)
initialTasksLen := len(tasks)

err = awaitTaskStatus(d, taskID, "RUNNING", 180*time.Second, 200*time.Millisecond)
require.NoError(t, err, "error should be nil")
_, err = triggerIngestionTask(s.client, "test-get-tasks-datasource", testObjects)
s.Require().NoError(err, "error should be nil")

shutdownTaskID, err := d.Tasks().Shutdown(taskID)
require.NoError(t, err, "error should be nil")
require.Equal(t, shutdownTaskID, taskID)
// ingestion tasks will be available in short time window, after some delay,
// so we need to actively wait for it
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(2 * time.Minute)
defer timer.Stop()
for {
select {
case <-ticker.C:
tasks, err = s.client.Tasks().GetRunningTasks(RunningTasksOptions{
Datasource: "test-get-tasks-datasource",
Type: "index_parallel",
})
s.Require().NoError(err)
if len(tasks) > initialTasksLen {
return
}
case <-timer.C:
s.FailNow("unable to get running tasks in time")
}
}
}

0 comments on commit c4d3b21

Please sign in to comment.