Skip to content

Commit

Permalink
Merge pull request #2320 from opengovern/fix-tasks
Browse files Browse the repository at this point in the history
Fix tasks
  • Loading branch information
artaasadi authored Dec 28, 2024
2 parents 837cd9d + 10bb644 commit 700a027
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 28 deletions.
2 changes: 1 addition & 1 deletion assets/tasks/container_grype_check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ResultType: "oci_container_vulnerabilities"
WorkloadType: "deployment"
EnvVars: {}
Interval: 0 # Minutes
Timeout: 60 # Minutes
Timeout: 120 # Minutes
ScaleConfig:
LagThreshold: "1"
MinReplica: 0
Expand Down
10 changes: 9 additions & 1 deletion services/tasks/api/task-run.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package api

import "time"
import (
"github.com/opengovern/opencomply/services/tasks/db/models"
"time"
)

type TaskRun struct {
ID uint `json:"id"`
Expand All @@ -11,3 +14,8 @@ type TaskRun struct {
Result string `json:"result"`
FailureMessage string `json:"failure_message"`
}

type ListTaskRunsResponse struct {
TotalCount int `json:"total_count"`
Items []models.TaskRun `json:"items"`
}
21 changes: 4 additions & 17 deletions services/tasks/api/tasks.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
package api

type TaskListResponse struct {
Tasks []TaskResponse `json:"tasks"`
TotalCount int `json:"total_count"`
}
import "github.com/opengovern/opencomply/services/tasks/db/models"

type TaskResponse struct {
ID uint `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
IsCompleted bool `json:"is_completed"`
CompletedDate string `json:"completed_date"`
LastRunDate string `json:"last_run_date"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
ImageUrl string `json:"image_url"`
Interval int `json:"interval"`
Status string `json:"status"`
Autorun bool `json:"autorun"`
type TaskListResponse struct {
Items []models.Task `json:"items"`
TotalCount int `json:"total_count"`
}

type RunTaskRequest struct {
Expand Down
13 changes: 13 additions & 0 deletions services/tasks/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ func (db Database) GetTaskRunResult(id string) (*models.TaskRun, error) {
return task, nil
}

// ListTaskRunResult retrieves a task result by Task ID
func (db Database) ListTaskRunResult() ([]models.TaskRun, error) {
var task []models.TaskRun
tx := db.Orm.
Order("created_at desc").
Find(&task)
if tx.Error != nil {
return nil, tx.Error
}

return task, nil
}

// FetchCreatedTaskRunsByTaskID retrieves a list of task runs
func (db Database) FetchCreatedTaskRunsByTaskID(taskID string) ([]models.TaskRun, error) {
var tasks []models.TaskRun
Expand Down
150 changes: 141 additions & 9 deletions services/tasks/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"encoding/json"
api2 "github.com/opengovern/og-util/pkg/api"
"github.com/opengovern/og-util/pkg/httpserver"
"github.com/opengovern/opencomply/pkg/utils"
"github.com/opengovern/opencomply/services/tasks/api"
"github.com/opengovern/opencomply/services/tasks/db"
"github.com/opengovern/opencomply/services/tasks/db/models"
"net/http"
"strconv"

"github.com/labstack/echo/v4"
"go.uber.org/zap"
Expand All @@ -23,12 +25,16 @@ type httpRoutes struct {

func (r *httpRoutes) Register(e *echo.Echo) {
v1 := e.Group("/api/v1")
// Get all tasks
v1.GET("/tasks", httpserver.AuthorizeHandler(r.getTasks, api2.EditorRole))
// List all tasks
v1.GET("/tasks", httpserver.AuthorizeHandler(r.ListTasks, api2.ViewerRole))
// Get task
v1.GET("/tasks/:id", httpserver.AuthorizeHandler(r.GetTask, api2.ViewerRole))
// Create a new task
v1.POST("/tasks/run", httpserver.AuthorizeHandler(r.runTask, api2.EditorRole))
v1.POST("/tasks/run", httpserver.AuthorizeHandler(r.RunTask, api2.EditorRole))
// Get Task Result
v1.GET("/tasks/run/:id/result", httpserver.AuthorizeHandler(r.getTaskRunResult, api2.EditorRole))
v1.GET("/tasks/run/:id", httpserver.AuthorizeHandler(r.GetTaskRunResult, api2.ViewerRole))
// List Tasks Result
v1.GET("/tasks/run", httpserver.AuthorizeHandler(r.ListTaskRunResult, api2.ViewerRole))

}

Expand All @@ -44,18 +50,86 @@ func bindValidate(ctx echo.Context, i interface{}) error {
return nil
}

func (r *httpRoutes) getTasks(ctx echo.Context) error {
tasks, err := r.db.GetTaskList()
// ListTasks godoc
//
// @Summary List tasks
// @Security BearerToken
// @Tags scheduler
// @Param cursor query int false "cursor"
// @Param per_page query int false "per page"
// @Produce json
// @Success 200 {object} api.ListTaskRunsResponse
// @Router /tasks/api/v1/tasks [get]
func (r *httpRoutes) ListTasks(ctx echo.Context) error {
var cursor, perPage int64
var err error
cursorStr := ctx.QueryParam("cursor")
if cursorStr != "" {
cursor, err = strconv.ParseInt(cursorStr, 10, 64)
if err != nil {
return err
}
}
perPageStr := ctx.QueryParam("per_page")
if perPageStr != "" {
perPage, err = strconv.ParseInt(perPageStr, 10, 64)
if err != nil {
return err
}
}

items, err := r.db.GetTaskList()
if err != nil {
r.logger.Error("failed to get tasks", zap.Error(err))
return ctx.JSON(http.StatusInternalServerError, "failed to get tasks")

}

return ctx.JSON(http.StatusOK, tasks)
totalCount := len(items)
if perPage != 0 {
if cursor == 0 {
items = utils.Paginate(1, perPage, items)
} else {
items = utils.Paginate(cursor, perPage, items)
}
}

return ctx.JSON(http.StatusOK, api.TaskListResponse{
TotalCount: totalCount,
Items: items,
})
}

// GetTask godoc
//
// @Summary Get task by id
// @Security BearerToken
// @Tags scheduler
// @Param id path string true "run id"
// @Produce json
// @Success 200 {object} models.Task
// @Router /tasks/api/v1/tasks/:id [get]
func (r *httpRoutes) GetTask(ctx echo.Context) error {
id := ctx.Param("id")
task, err := r.db.GetTask(id)
if err != nil {
r.logger.Error("failed to get task results", zap.Error(err))
return ctx.JSON(http.StatusInternalServerError, "failed to get task results")
}

return ctx.JSON(http.StatusOK, task)
}

func (r *httpRoutes) runTask(ctx echo.Context) error {
// RunTask godoc
//
// @Summary Run a new task
// @Security BearerToken
// @Tags scheduler
// @Param request body api.RunTaskRequest true "Run task request"
// @Produce json
// @Success 200 {object} models.TaskRun
// @Router /tasks/api/v1/tasks/run [post]
func (r *httpRoutes) RunTask(ctx echo.Context) error {
var req api.RunTaskRequest
if err := bindValidate(ctx, &req); err != nil {
r.logger.Error("failed to bind task", zap.Error(err))
Expand Down Expand Up @@ -95,7 +169,16 @@ func (r *httpRoutes) runTask(ctx echo.Context) error {
return ctx.JSON(http.StatusCreated, run)
}

func (r *httpRoutes) getTaskRunResult(ctx echo.Context) error {
// GetTaskRunResult godoc
//
// @Summary Get task run
// @Security BearerToken
// @Tags scheduler
// @Param id path string true "run id"
// @Produce json
// @Success 200 {object} models.TaskRun
// @Router /tasks/api/v1/tasks/run/:id [get]
func (r *httpRoutes) GetTaskRunResult(ctx echo.Context) error {
id := ctx.Param("id")
taskResults, err := r.db.GetTaskRunResult(id)
if err != nil {
Expand All @@ -105,3 +188,52 @@ func (r *httpRoutes) getTaskRunResult(ctx echo.Context) error {

return ctx.JSON(http.StatusOK, taskResults)
}

// ListTaskRunResult godoc
//
// @Summary List task runs
// @Security BearerToken
// @Tags scheduler
// @Param cursor query int false "cursor"
// @Param per_page query int false "per page"
// @Produce json
// @Success 200 {object} api.ListTaskRunsResponse
// @Router /tasks/api/v1/tasks/run [get]
func (r *httpRoutes) ListTaskRunResult(ctx echo.Context) error {
var cursor, perPage int64
var err error
cursorStr := ctx.QueryParam("cursor")
if cursorStr != "" {
cursor, err = strconv.ParseInt(cursorStr, 10, 64)
if err != nil {
return err
}
}
perPageStr := ctx.QueryParam("per_page")
if perPageStr != "" {
perPage, err = strconv.ParseInt(perPageStr, 10, 64)
if err != nil {
return err
}
}

items, err := r.db.ListTaskRunResult()
if err != nil {
r.logger.Error("failed to get task results", zap.Error(err))
return ctx.JSON(http.StatusInternalServerError, "failed to get task results")
}

totalCount := len(items)
if perPage != 0 {
if cursor == 0 {
items = utils.Paginate(1, perPage, items)
} else {
items = utils.Paginate(cursor, perPage, items)
}
}

return ctx.JSON(http.StatusOK, api.ListTaskRunsResponse{
TotalCount: totalCount,
Items: items,
})
}
5 changes: 5 additions & 0 deletions services/tasks/scheduler/consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"bytes"
"encoding/json"
"github.com/nats-io/nats.go/jetstream"
"github.com/opengovern/opencomply/services/tasks/db/models"
Expand Down Expand Up @@ -32,6 +33,10 @@ func (s *TaskScheduler) RunTaskResponseConsumer(ctx context.Context) error {
Status: response.Status,
FailureMessage: response.FailureMessage,
}
emptyResult := []byte("")
if response.Result == nil || len(response.Result) == 0 || bytes.Equal(response.Result, emptyResult) {
response.Result = []byte("{}")
}
err := taskRunUpdate.Result.Set(response.Result)
if err != nil {
s.logger.Error("failed to set result", zap.Error(err))
Expand Down

0 comments on commit 700a027

Please sign in to comment.