diff --git a/task_types.go b/task_types.go index 45fc926..f270f32 100644 --- a/task_types.go +++ b/task_types.go @@ -40,6 +40,22 @@ type TaskIngestionSpec struct { Spec *IngestionSpecData `json:"spec"` } +// RunningTask defines running task returned by GetRunningTasks method. +// https://druid.apache.org/docs/latest/api-reference/tasks-api#sample-response-2 +type RunningTask struct { + ID string `json:"id"` + Type string `json:"type"` + Status string `json:"status"` + Datasource string `json:"dataSource"` +} + +// RunningTasksOptions defines supported options which can be passed to GetRunningTasks method. +// https://druid.apache.org/docs/latest/api-reference/tasks-api#query-parameters-2 +type RunningTasksOptions struct { + Datasource string `url:"datasource"` + Type string `url:"type"` +} + // defaultTaskIngestionSpec returns a default TaskIngestionSpec with basic ingestion // specification fields initialized. func defaultTaskIngestionSpec() *TaskIngestionSpec { @@ -156,9 +172,11 @@ func SetTaskIOConfigType(typ string) TaskIngestionSpecOptions { // SetTaskInputFormat configures input format for the task based ingestion. func SetTaskInputFormat(typ string, findColumnsHeader string, columns []string) TaskIngestionSpecOptions { return func(spec *TaskIngestionSpec) { - spec.Spec.IOConfig.InputFormat.Type = typ - spec.Spec.IOConfig.InputFormat.FindColumnsFromHeader = findColumnsHeader - spec.Spec.IOConfig.InputFormat.Columns = columns + spec.Spec.IOConfig.InputFormat = &InputFormat{ + Type: typ, + FindColumnsFromHeader: findColumnsHeader, + Columns: columns, + } } } diff --git a/tasks.go b/tasks.go index 3bb6724..0fcf4ee 100644 --- a/tasks.go +++ b/tasks.go @@ -1,6 +1,9 @@ package druid -import "strings" +import ( + "net/http" + "strings" +) const ( tasksEndpoint = "druid/indexer/v1/tasks" @@ -82,3 +85,18 @@ func (s *TasksService) Shutdown(taskId string) (string, error) { func applyTaskId(input string, taskId string) string { return strings.Replace(input, ":taskId", taskId, 1) } + +// GetRunningTasks calls druid task service's running tasks API. +// https://druid.apache.org/docs/latest/api-reference/tasks-api#get-an-array-of-running-tasks +func (s *TasksService) GetRunningTasks(options RunningTasksOptions) ([]*RunningTask, error) { + r, err := s.client.NewRequest(http.MethodGet, tasksRunningEndpoint, options) + var result []*RunningTask + if err != nil { + return nil, err + } + _, err = s.client.Do(r, &result) + if err != nil { + return result, err + } + return result, nil +} diff --git a/tasks_test.go b/tasks_test.go index bb0531e..936608f 100644 --- a/tasks_test.go +++ b/tasks_test.go @@ -10,7 +10,7 @@ import ( "github.com/gocarina/gocsv" "github.com/google/uuid" "github.com/jmoiron/sqlx/types" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" tc "github.com/testcontainers/testcontainers-go/modules/compose" "github.com/testcontainers/testcontainers-go/wait" ) @@ -22,6 +22,17 @@ type testDO struct { Payload types.JSONText `db:"payload"` } +type TasksServiceTestSuite struct { + suite.Suite + + compose tc.ComposeStack + client *Client +} + +func TestTaskService(t *testing.T) { + suite.Run(t, &TasksServiceTestSuite{}) +} + var testObjects = []testDO{ { Id: uuid.New(), @@ -127,71 +138,88 @@ func runInlineIngestionTask[T any](client *Client, dataSourceName string, entrie return nil } -func TestTaskService(t *testing.T) { - // Set up druid containers using docker-compose. - compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml") - require.NoError(t, err, "NewDockerComposeAPI()") - - // Set up cleanup for druid containers. - t.Cleanup(func() { - require.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()") - }) +func (s *TasksServiceTestSuite) SetupSuite() { + var err error + s.compose, err = tc.NewDockerCompose("testdata/docker-compose.yaml") + s.Require().NoError(err, "NewDockerComposeAPI()") ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Set up druid service and client. - d, err := NewClient("http://localhost:8888") - require.NoError(t, err, "error should be nil") + s.client, err = NewClient("http://localhost:8888") + s.Require().NoError(err, "error should be nil") // Waiting for druid services to start. - err = compose. + err = s.compose. WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)). WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)). WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)). WaitForService("middlemanager", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8091/tcp").WithStartupTimeout(180*time.Second)). Up(ctx, tc.Wait(true)) - require.NoError(t, err, "druid services should be up with no error") - - // Test create ingestion task -> get status -> complete sequence. - runInlineIngestionTask(d, "test-submit-task-datasource", testObjects, 2) - require.NoError(t, err, "error should be nil") + s.Require().NoError(err, "druid services should be up with no error") } -func TestTerminateTask(t *testing.T) { - // Set up druid containers using docker-compose. - compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml") - require.NoError(t, err, "NewDockerComposeAPI()") +func (s *TasksServiceTestSuite) TearDownSuite() { + s.Require().NoError( + s.compose.Down( + context.Background(), + tc.RemoveOrphans(true), + tc.RemoveVolumes(true), + tc.RemoveImagesLocal), + "compose.Down()", + ) +} - // Set up cleanup for druid containers. - t.Cleanup(func() { - require.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()") - }) +func (s *TasksServiceTestSuite) TestSubmit() { + // Test create ingestion task -> get status -> complete sequence. + err := runInlineIngestionTask(s.client, "test-submit-task-datasource", testObjects, 2) + s.Require().NoError(err, "error should be nil") +} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func (s *TasksServiceTestSuite) TestTerminate() { + // Test create ingestion task -> get status -> terminate sequence. + taskID, err := triggerIngestionTask(s.client, "test-terminate-task-datasource", testObjects) + s.Require().NoError(err, "error should be nil") - // Set up druid service and client. - d, err := NewClient("http://localhost:8888") - require.NoError(t, err, "error should be nil") + err = awaitTaskStatus(s.client, taskID, "RUNNING", 180*time.Second, 200*time.Millisecond) + s.Require().NoError(err, "error should be nil") - // Waiting for druid services to start. - err = compose. - WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)). - WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)). - WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)). - WaitForService("middlemanager", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8091/tcp").WithStartupTimeout(180*time.Second)). - Up(ctx, tc.Wait(true)) - require.NoError(t, err, "druid services should be up with no error") + shutdownTaskID, err := s.client.Tasks().Shutdown(taskID) + s.Require().NoError(err, "error should be nil") + s.Require().Equal(shutdownTaskID, taskID) +} - // Test create ingestion task -> get status -> terminate sequence. - taskID, err := triggerIngestionTask(d, "test-terminate-task-datasource", testObjects) - require.NoError(t, err, "error should be nil") +func (s *TasksServiceTestSuite) TestGetRunningTasks() { + tasks, err := s.client.Tasks().GetRunningTasks(RunningTasksOptions{ + Datasource: "test-get-tasks-datasource", + Type: "index_parallel", + }) + s.Require().NoError(err) + initialTasksLen := len(tasks) - err = awaitTaskStatus(d, taskID, "RUNNING", 180*time.Second, 200*time.Millisecond) - require.NoError(t, err, "error should be nil") + _, err = triggerIngestionTask(s.client, "test-get-tasks-datasource", testObjects) + s.Require().NoError(err, "error should be nil") - shutdownTaskID, err := d.Tasks().Shutdown(taskID) - require.NoError(t, err, "error should be nil") - require.Equal(t, shutdownTaskID, taskID) + // ingestion tasks will be available in short time window, after some delay, + // so we need to actively wait for it + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + timer := time.NewTimer(2 * time.Minute) + defer timer.Stop() + for { + select { + case <-ticker.C: + tasks, err = s.client.Tasks().GetRunningTasks(RunningTasksOptions{ + Datasource: "test-get-tasks-datasource", + Type: "index_parallel", + }) + s.Require().NoError(err) + if len(tasks) > initialTasksLen { + return + } + case <-timer.C: + s.FailNow("unable to get running tasks in time") + } + } }