Skip to content

Commit

Permalink
Job run updates (#4295)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianedwards authored Feb 19, 2024
1 parent a97946e commit cb17382
Show file tree
Hide file tree
Showing 14 changed files with 662 additions and 318 deletions.
100 changes: 57 additions & 43 deletions api/server/handlers/porter_app/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package porter_app
import (
"net/http"

"connectrpc.com/connect"
porterv1 "github.com/porter-dev/api-contracts/generated/go/porter/v1"
"github.com/porter-dev/porter/api/server/authz"
"github.com/porter-dev/porter/api/server/handlers"
"github.com/porter-dev/porter/api/server/shared"
"github.com/porter-dev/porter/api/server/shared/apierrors"
"github.com/porter-dev/porter/api/server/shared/config"
"github.com/porter-dev/porter/api/server/shared/requestutils"
"github.com/porter-dev/porter/api/types"
"github.com/porter-dev/porter/internal/deployment_target"
"github.com/porter-dev/porter/internal/kubernetes"
"github.com/porter-dev/porter/internal/models"
"github.com/porter-dev/porter/internal/porter_app"
"github.com/porter-dev/porter/internal/telemetry"
)

Expand All @@ -36,8 +37,14 @@ func NewJobStatusHandler(

// JobStatusRequest is the expected format for a request body on GET /apps/jobs
type JobStatusRequest struct {
DeploymentTargetID string `schema:"deployment_target_id"`
JobName string `schema:"job_name"`
DeploymentTargetID string `schema:"deployment_target_id,omitempty"`
DeploymentTargetName string `schema:"deployment_target_name,omitempty"`
JobName string `schema:"job_name"`
}

// JobStatusResponse is the response format for GET /apps/jobs
type JobStatusResponse struct {
JobRuns []porter_app.JobRun `json:"job_runs"`
}

func (c *JobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -63,57 +70,64 @@ func (c *JobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "app-name", Value: name})

if request.DeploymentTargetID == "" {
err := telemetry.Error(ctx, span, nil, "must provide deployment target id")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
return
deploymentTargetName := request.DeploymentTargetName
if request.DeploymentTargetName == "" && request.DeploymentTargetID == "" {
defaultDeploymentTarget, err := defaultDeploymentTarget(ctx, defaultDeploymentTargetInput{
ProjectID: project.ID,
ClusterID: cluster.ID,
ClusterControlPlaneClient: c.Config().ClusterControlPlaneClient,
})
if err != nil {
err := telemetry.Error(ctx, span, err, "error getting default deployment target")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return
}
deploymentTargetName = defaultDeploymentTarget.Name
}
telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID})

deploymentTarget, err := deployment_target.DeploymentTargetDetails(ctx, deployment_target.DeploymentTargetDetailsInput{
ProjectID: int64(project.ID),
ClusterID: int64(cluster.ID),
DeploymentTargetID: request.DeploymentTargetID,
CCPClient: c.Config().ClusterControlPlaneClient,
telemetry.WithAttributes(span,
telemetry.AttributeKV{Key: "deployment-target-name", Value: deploymentTargetName},
telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID},
)

jobRunsRequest := connect.NewRequest(&porterv1.JobRunsRequest{
ProjectId: int64(project.ID),
DeploymentTargetIdentifier: &porterv1.DeploymentTargetIdentifier{
Id: request.DeploymentTargetID,
Name: deploymentTargetName,
},
AppName: name,
JobServiceName: request.JobName,
})

jobRunsResp, err := c.Config().ClusterControlPlaneClient.JobRuns(ctx, jobRunsRequest)
if err != nil {
err := telemetry.Error(ctx, span, err, "error getting deployment target details")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
err := telemetry.Error(ctx, span, err, "error getting job runs from cluster control plane client")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
return
}

namespace := deploymentTarget.Namespace
telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "namespace", Value: namespace})

agent, err := c.GetAgent(r, cluster, "")
if err != nil {
err = telemetry.Error(ctx, span, err, "unable to get agent")
if jobRunsResp == nil || jobRunsResp.Msg == nil {
err := telemetry.Error(ctx, span, nil, "job runs response is nil")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return
}

labels := []kubernetes.Label{
{
Key: "porter.run/deployment-target-id",
Val: request.DeploymentTargetID,
},
{
Key: "porter.run/app-name",
Val: name,
},
}
if request.JobName != "" {
labels = append(labels, kubernetes.Label{
Key: "porter.run/service-name",
Val: request.JobName,
})
runs := []porter_app.JobRun{}
for _, jobRun := range jobRunsResp.Msg.JobRuns {
run, err := porter_app.JobRunFromProto(ctx, jobRun)
if err != nil {
err := telemetry.Error(ctx, span, err, "error converting job run from proto")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return
}

runs = append(runs, run)
}
jobs, err := agent.ListJobsByLabel(namespace, labels...)
if err != nil {
err = telemetry.Error(ctx, span, err, "error listing jobs")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return

res := JobStatusResponse{
JobRuns: runs,
}

c.WriteResult(w, r, jobs)
c.WriteResult(w, r, res)
}
133 changes: 133 additions & 0 deletions api/server/handlers/porter_app/job_status_by_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package porter_app

import (
"net/http"

"connectrpc.com/connect"
porterv1 "github.com/porter-dev/api-contracts/generated/go/porter/v1"
"github.com/porter-dev/porter/api/server/authz"
"github.com/porter-dev/porter/api/server/handlers"
"github.com/porter-dev/porter/api/server/shared"
"github.com/porter-dev/porter/api/server/shared/apierrors"
"github.com/porter-dev/porter/api/server/shared/config"
"github.com/porter-dev/porter/api/server/shared/requestutils"
"github.com/porter-dev/porter/api/types"
"github.com/porter-dev/porter/internal/models"
"github.com/porter-dev/porter/internal/porter_app"
"github.com/porter-dev/porter/internal/telemetry"
)

// JobStatusByNameHandler is the handler for GET /apps/jobs/{porter_app_name}/{job_run_name}
type JobStatusByNameHandler struct {
handlers.PorterHandlerReadWriter
authz.KubernetesAgentGetter
}

// NewJobStatusByNameHandler returns a new JobStatusByNameHandler
func NewJobStatusByNameHandler(
config *config.Config,
decoderValidator shared.RequestDecoderValidator,
writer shared.ResultWriter,
) *JobStatusByNameHandler {
return &JobStatusByNameHandler{
PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
}
}

// JobStatusByNameRequest is the expected format for a request body on GET /apps/jobs/{porter_app_name}/{job_run_name}
type JobStatusByNameRequest struct {
DeploymentTargetID string `schema:"deployment_target_id,omitempty"`
DeploymentTargetName string `schema:"deployment_target_name,omitempty"`
JobRunName string `schema:"job_run_name"`
}

// JobStatusByNameResponse is the response format for GET /apps/jobs/{porter_app_name}/{job_run_name}
type JobStatusByNameResponse struct {
JobRun porter_app.JobRun `json:"job_run"`
}

func (c *JobStatusByNameHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, span := telemetry.NewSpan(r.Context(), "serve-job-status")
defer span.End()

request := &JobStatusByNameRequest{}
if ok := c.DecodeAndValidate(w, r, request); !ok {
err := telemetry.Error(ctx, span, nil, "invalid request")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
return
}

cluster, _ := ctx.Value(types.ClusterScope).(*models.Cluster)
project, _ := ctx.Value(types.ProjectScope).(*models.Project)

name, reqErr := requestutils.GetURLParamString(r, types.URLParamPorterAppName)
if reqErr != nil {
err := telemetry.Error(ctx, span, reqErr, "invalid porter app name")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
return
}
telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "app-name", Value: name})

jobRunName, reqErr := requestutils.GetURLParamString(r, types.URLParamJobRunName)
if reqErr != nil {
err := telemetry.Error(ctx, span, reqErr, "invalid job run name")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
return
}

deploymentTargetName := request.DeploymentTargetName
if request.DeploymentTargetName == "" && request.DeploymentTargetID == "" {
defaultDeploymentTarget, err := defaultDeploymentTarget(ctx, defaultDeploymentTargetInput{
ProjectID: project.ID,
ClusterID: cluster.ID,
ClusterControlPlaneClient: c.Config().ClusterControlPlaneClient,
})
if err != nil {
err := telemetry.Error(ctx, span, err, "error getting default deployment target")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return
}
deploymentTargetName = defaultDeploymentTarget.Name
}

telemetry.WithAttributes(span,
telemetry.AttributeKV{Key: "deployment-target-name", Value: deploymentTargetName},
telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID},
)

jobRunsRequest := connect.NewRequest(&porterv1.JobRunStatusRequest{
ProjectId: int64(project.ID),
DeploymentTargetIdentifier: &porterv1.DeploymentTargetIdentifier{
Id: request.DeploymentTargetID,
Name: deploymentTargetName,
},
JobRunName: jobRunName,
})

jobRunResp, err := c.Config().ClusterControlPlaneClient.JobRunStatus(ctx, jobRunsRequest)
if err != nil {
err := telemetry.Error(ctx, span, err, "error getting job run from cluster control plane client")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
return
}

if jobRunResp == nil || jobRunResp.Msg == nil {
err := telemetry.Error(ctx, span, nil, "job run response is nil")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return
}

run, err := porter_app.JobRunFromProto(ctx, jobRunResp.Msg.JobRun)
if err != nil {
err := telemetry.Error(ctx, span, err, "error converting job run from proto")
c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
return
}

res := JobStatusByNameResponse{
JobRun: run,
}

c.WriteResult(w, r, res)
}
6 changes: 4 additions & 2 deletions api/server/handlers/porter_app/run_app_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type RunAppJobRequest struct {

// RunAppJobResponse is the response object for the /apps/{porter_app_name}/run endpoint
type RunAppJobResponse struct {
JobRunID string `json:"job_run_id"`
JobRunID string `json:"job_run_id"`
JobRunName string `json:"job_run_name"`
}

// ServeHTTP runs a one-off command in the same environment as the provided service, app and deployment target
Expand Down Expand Up @@ -149,7 +150,8 @@ func (c *RunAppJobHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

response := RunAppJobResponse{
JobRunID: serviceResp.Msg.JobRunId,
JobRunID: serviceResp.Msg.JobRunId,
JobRunName: serviceResp.Msg.JobRunName,
}

c.WriteResult(w, r, response)
Expand Down
29 changes: 29 additions & 0 deletions api/server/router/porter_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,35 @@ func getPorterAppRoutes(
Router: r,
})

// GET /api/projects/{project_id}/clusters/{cluster_id}/apps/{porter_app_name}/jobs/{job_run_name} -> porter_app.JobStatusByNameHandler
appJobStatusByNameEndpoint := factory.NewAPIEndpoint(
&types.APIRequestMetadata{
Verb: types.APIVerbGet,
Method: types.HTTPVerbGet,
Path: &types.Path{
Parent: basePath,
RelativePath: fmt.Sprintf("%s/{%s}/jobs/{%s}", relPathV2, types.URLParamPorterAppName, types.URLParamJobRunName),
},
Scopes: []types.PermissionScope{
types.UserScope,
types.ProjectScope,
types.ClusterScope,
},
},
)

appJobStatusByNameHandler := porter_app.NewJobStatusByNameHandler(
config,
factory.GetDecoderValidator(),
factory.GetResultWriter(),
)

routes = append(routes, &router.Route{
Endpoint: appJobStatusByNameEndpoint,
Handler: appJobStatusByNameHandler,
Router: r,
})

// GET /api/projects/{project_id}/clusters/{cluster_id}/apps/{porter_app_name}/revisions/{app_revision_id} -> porter_app.NewGetAppRevisionHandler
getAppRevisionEndpoint := factory.NewAPIEndpoint(
&types.APIRequestMetadata{
Expand Down
1 change: 1 addition & 0 deletions api/types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
// URLParamDeploymentTargetIdentifier can be either the deployment target id or deployment target name
URLParamDeploymentTargetIdentifier URLParam = "deployment_target_identifier"
URLParamWebhookID URLParam = "webhook_id"
URLParamJobRunName URLParam = "job_run_name"
)

type Path struct {
Expand Down
Loading

0 comments on commit cb17382

Please sign in to comment.