From 7db1b6d8eb1ee581fc99895d83dd53a9ae0ccef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Fri, 1 Sep 2023 12:57:07 +0000 Subject: [PATCH] Add batch option to determine depth of SIPs Co-authored-by: camlyall --- internal/api/design/batch.go | 4 ++ internal/api/gen/batch/service.go | 1 + internal/api/gen/http/batch/client/cli.go | 16 ++++++- internal/api/gen/http/batch/client/types.go | 8 ++++ internal/api/gen/http/batch/server/types.go | 12 +++++ internal/api/gen/http/cli/enduro/cli.go | 2 + internal/api/gen/http/openapi.json | 7 +++ internal/api/gen/http/openapi.yaml | 6 +++ internal/api/gen/http/openapi3.json | 8 ++++ internal/api/gen/http/openapi3.yaml | 7 +++ internal/batch/service.go | 1 + internal/batch/workflow.go | 48 +++++++++++++++---- internal/batch/workflow_test.go | 2 +- .../models/SubmitRequestBody.ts | 8 ++++ ui/src/views/Batch.vue | 13 ++++- 15 files changed, 130 insertions(+), 13 deletions(-) diff --git a/internal/api/design/batch.go b/internal/api/design/batch.go index 8fc70005..06c8f62d 100644 --- a/internal/api/design/batch.go +++ b/internal/api/design/batch.go @@ -20,6 +20,10 @@ var _ = Service("batch", func() { Attribute("reject_duplicates", Boolean, func() { Default(false) }) Attribute("transfer_type", String) Attribute("process_name_metadata", Boolean, func() { Default(false) }) + Attribute("depth", Int, func() { + Default(0) + Minimum(0) + }) Required("path") }) Result(BatchResult) diff --git a/internal/api/gen/batch/service.go b/internal/api/gen/batch/service.go index eaace8b8..0936926f 100644 --- a/internal/api/gen/batch/service.go +++ b/internal/api/gen/batch/service.go @@ -64,6 +64,7 @@ type SubmitPayload struct { RejectDuplicates bool TransferType *string ProcessNameMetadata bool + Depth int } // MakeNotAvailable builds a goa.ServiceError from an error. diff --git a/internal/api/gen/http/batch/client/cli.go b/internal/api/gen/http/batch/client/cli.go index cf65ba21..b671a632 100644 --- a/internal/api/gen/http/batch/client/cli.go +++ b/internal/api/gen/http/batch/client/cli.go @@ -13,6 +13,7 @@ import ( "fmt" batch "github.com/artefactual-labs/enduro/internal/api/gen/batch" + goa "goa.design/goa/v3/pkg" ) // BuildSubmitPayload builds the payload for the batch submit endpoint from CLI @@ -23,7 +24,13 @@ func BuildSubmitPayload(batchSubmitBody string) (*batch.SubmitPayload, error) { { err = json.Unmarshal([]byte(batchSubmitBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "'{\n \"completed_dir\": \"abc123\",\n \"path\": \"abc123\",\n \"pipeline\": \"abc123\",\n \"process_name_metadata\": false,\n \"processing_config\": \"abc123\",\n \"reject_duplicates\": false,\n \"retention_period\": \"abc123\",\n \"transfer_type\": \"abc123\"\n }'") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "'{\n \"completed_dir\": \"abc123\",\n \"depth\": 1,\n \"path\": \"abc123\",\n \"pipeline\": \"abc123\",\n \"process_name_metadata\": false,\n \"processing_config\": \"abc123\",\n \"reject_duplicates\": false,\n \"retention_period\": \"abc123\",\n \"transfer_type\": \"abc123\"\n }'") + } + if body.Depth < 0 { + err = goa.MergeErrors(err, goa.InvalidRangeError("body.depth", body.Depth, 0, true)) + } + if err != nil { + return nil, err } } v := &batch.SubmitPayload{ @@ -35,6 +42,7 @@ func BuildSubmitPayload(batchSubmitBody string) (*batch.SubmitPayload, error) { RejectDuplicates: body.RejectDuplicates, TransferType: body.TransferType, ProcessNameMetadata: body.ProcessNameMetadata, + Depth: body.Depth, } { var zero bool @@ -48,6 +56,12 @@ func BuildSubmitPayload(batchSubmitBody string) (*batch.SubmitPayload, error) { v.ProcessNameMetadata = false } } + { + var zero int + if v.Depth == zero { + v.Depth = 0 + } + } return v, nil } diff --git a/internal/api/gen/http/batch/client/types.go b/internal/api/gen/http/batch/client/types.go index 12f1a60a..d7becbaf 100644 --- a/internal/api/gen/http/batch/client/types.go +++ b/internal/api/gen/http/batch/client/types.go @@ -24,6 +24,7 @@ type SubmitRequestBody struct { RejectDuplicates bool `form:"reject_duplicates" json:"reject_duplicates" xml:"reject_duplicates"` TransferType *string `form:"transfer_type,omitempty" json:"transfer_type,omitempty" xml:"transfer_type,omitempty"` ProcessNameMetadata bool `form:"process_name_metadata" json:"process_name_metadata" xml:"process_name_metadata"` + Depth int `form:"depth" json:"depth" xml:"depth"` } // SubmitResponseBody is the type of the "batch" service "submit" endpoint HTTP @@ -97,6 +98,7 @@ func NewSubmitRequestBody(p *batch.SubmitPayload) *SubmitRequestBody { RejectDuplicates: p.RejectDuplicates, TransferType: p.TransferType, ProcessNameMetadata: p.ProcessNameMetadata, + Depth: p.Depth, } { var zero bool @@ -110,6 +112,12 @@ func NewSubmitRequestBody(p *batch.SubmitPayload) *SubmitRequestBody { body.ProcessNameMetadata = false } } + { + var zero int + if body.Depth == zero { + body.Depth = 0 + } + } return body } diff --git a/internal/api/gen/http/batch/server/types.go b/internal/api/gen/http/batch/server/types.go index 46d82aed..81842c1f 100644 --- a/internal/api/gen/http/batch/server/types.go +++ b/internal/api/gen/http/batch/server/types.go @@ -24,6 +24,7 @@ type SubmitRequestBody struct { RejectDuplicates *bool `form:"reject_duplicates,omitempty" json:"reject_duplicates,omitempty" xml:"reject_duplicates,omitempty"` TransferType *string `form:"transfer_type,omitempty" json:"transfer_type,omitempty" xml:"transfer_type,omitempty"` ProcessNameMetadata *bool `form:"process_name_metadata,omitempty" json:"process_name_metadata,omitempty" xml:"process_name_metadata,omitempty"` + Depth *int `form:"depth,omitempty" json:"depth,omitempty" xml:"depth,omitempty"` } // SubmitResponseBody is the type of the "batch" service "submit" endpoint HTTP @@ -164,12 +165,18 @@ func NewSubmitPayload(body *SubmitRequestBody) *batch.SubmitPayload { if body.ProcessNameMetadata != nil { v.ProcessNameMetadata = *body.ProcessNameMetadata } + if body.Depth != nil { + v.Depth = *body.Depth + } if body.RejectDuplicates == nil { v.RejectDuplicates = false } if body.ProcessNameMetadata == nil { v.ProcessNameMetadata = false } + if body.Depth == nil { + v.Depth = 0 + } return v } @@ -179,5 +186,10 @@ func ValidateSubmitRequestBody(body *SubmitRequestBody) (err error) { if body.Path == nil { err = goa.MergeErrors(err, goa.MissingFieldError("path", "body")) } + if body.Depth != nil { + if *body.Depth < 0 { + err = goa.MergeErrors(err, goa.InvalidRangeError("body.depth", *body.Depth, 0, true)) + } + } return } diff --git a/internal/api/gen/http/cli/enduro/cli.go b/internal/api/gen/http/cli/enduro/cli.go index 9d724f54..0d82b391 100644 --- a/internal/api/gen/http/cli/enduro/cli.go +++ b/internal/api/gen/http/cli/enduro/cli.go @@ -36,6 +36,7 @@ func UsageExamples() string { return os.Args[0] + ` pipeline list --name "abc123" --status false` + "\n" + os.Args[0] + ` batch submit --body '{ "completed_dir": "abc123", + "depth": 1, "path": "abc123", "pipeline": "abc123", "process_name_metadata": false, @@ -410,6 +411,7 @@ Submit a new batch Example: %[1]s batch submit --body '{ "completed_dir": "abc123", + "depth": 1, "path": "abc123", "pipeline": "abc123", "process_name_metadata": false, diff --git a/internal/api/gen/http/openapi.json b/internal/api/gen/http/openapi.json index c51c778e..90df0b66 100644 --- a/internal/api/gen/http/openapi.json +++ b/internal/api/gen/http/openapi.json @@ -167,6 +167,7 @@ "BatchSubmitRequestBody": { "example": { "completed_dir": "abc123", + "depth": 1, "path": "abc123", "pipeline": "abc123", "process_name_metadata": false, @@ -180,6 +181,12 @@ "example": "abc123", "type": "string" }, + "depth": { + "default": 0, + "example": 1, + "minimum": 0, + "type": "integer" + }, "path": { "example": "abc123", "type": "string" diff --git a/internal/api/gen/http/openapi.yaml b/internal/api/gen/http/openapi.yaml index 4aa1ae74..2dbda671 100644 --- a/internal/api/gen/http/openapi.yaml +++ b/internal/api/gen/http/openapi.yaml @@ -645,6 +645,11 @@ definitions: completed_dir: type: string example: abc123 + depth: + type: integer + default: 0 + example: 1 + minimum: 0 path: type: string example: abc123 @@ -670,6 +675,7 @@ definitions: example: abc123 example: completed_dir: abc123 + depth: 1 path: abc123 pipeline: abc123 process_name_metadata: false diff --git a/internal/api/gen/http/openapi3.json b/internal/api/gen/http/openapi3.json index 904a1c1f..4d582181 100644 --- a/internal/api/gen/http/openapi3.json +++ b/internal/api/gen/http/openapi3.json @@ -564,6 +564,7 @@ "SubmitRequestBody": { "example": { "completed_dir": "abc123", + "depth": 1, "path": "abc123", "pipeline": "abc123", "process_name_metadata": false, @@ -577,6 +578,12 @@ "example": "abc123", "type": "string" }, + "depth": { + "default": 0, + "example": 1, + "minimum": 0, + "type": "integer" + }, "path": { "example": "abc123", "type": "string" @@ -656,6 +663,7 @@ "application/json": { "example": { "completed_dir": "abc123", + "depth": 1, "path": "abc123", "pipeline": "abc123", "process_name_metadata": false, diff --git a/internal/api/gen/http/openapi3.yaml b/internal/api/gen/http/openapi3.yaml index a53cfa00..d18f2e2d 100644 --- a/internal/api/gen/http/openapi3.yaml +++ b/internal/api/gen/http/openapi3.yaml @@ -38,6 +38,7 @@ paths: $ref: '#/components/schemas/SubmitRequestBody' example: completed_dir: abc123 + depth: 1 path: abc123 pipeline: abc123 process_name_metadata: false @@ -1148,6 +1149,11 @@ components: completed_dir: type: string example: abc123 + depth: + type: integer + default: 0 + example: 1 + minimum: 0 path: type: string example: abc123 @@ -1173,6 +1179,7 @@ components: example: abc123 example: completed_dir: abc123 + depth: 1 path: abc123 pipeline: abc123 process_name_metadata: false diff --git a/internal/batch/service.go b/internal/batch/service.go index d276f66f..6642f5c6 100644 --- a/internal/batch/service.go +++ b/internal/batch/service.go @@ -77,6 +77,7 @@ func (s *batchImpl) Submit(ctx context.Context, payload *goabatch.SubmitPayload) input.TransferType = *payload.TransferType } input.MetadataConfig.ProcessNameMetadata = payload.ProcessNameMetadata + input.Depth = int32(payload.Depth) opts := temporalsdk_client.StartWorkflowOptions{ ID: BatchWorkflowID, WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, diff --git a/internal/batch/workflow.go b/internal/batch/workflow.go index a6da2c91..0d9d2e03 100644 --- a/internal/batch/workflow.go +++ b/internal/batch/workflow.go @@ -2,7 +2,9 @@ package batch import ( "context" - "os" + "io/fs" + "path/filepath" + "strings" "time" temporalsdk_temporal "go.temporal.io/sdk/temporal" @@ -32,6 +34,7 @@ type BatchWorkflowInput struct { RejectDuplicates bool TransferType string MetadataConfig metadata.Config + Depth int32 } func BatchWorkflow(ctx temporalsdk_workflow.Context, params BatchWorkflowInput) error { @@ -56,19 +59,39 @@ func NewBatchActivity(batchsvc Service) *BatchActivity { } func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput) error { - files, err := os.ReadDir(params.Path) - if err != nil { - return temporal.NewNonRetryableError(err) - } pipelines := []string{} if params.PipelineName != "" { pipelines = append(pipelines, params.PipelineName) } - for _, file := range files { + + if params.Depth < 0 { + params.Depth = 0 + } + + root := params.Path + err := filepath.WalkDir(root, func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return err + } + + rel, err := filepath.Rel(root, path) + if err != nil { + return err + } + + if rel == "." { + return nil // Ignore root. + } + + depth := len(strings.Split(rel, string(filepath.Separator))) - 1 + if depth != int(params.Depth) { + return nil // Keep walking. + } + req := collection.ProcessingWorkflowRequest{ - BatchDir: params.Path, - Key: file.Name(), - IsDir: file.IsDir(), + BatchDir: filepath.Dir(path), + Key: entry.Name(), + IsDir: entry.IsDir(), PipelineNames: pipelines, ProcessingConfig: params.ProcessingConfig, CompletedDir: params.CompletedDir, @@ -77,7 +100,14 @@ func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput) TransferType: params.TransferType, MetadataConfig: params.MetadataConfig, } + _ = a.batchsvc.InitProcessingWorkflow(ctx, &req) + + return fs.SkipDir + }) + if err != nil { + return temporal.NewNonRetryableError(err) } + return nil } diff --git a/internal/batch/workflow_test.go b/internal/batch/workflow_test.go index 4b26a796..b41e4193 100644 --- a/internal/batch/workflow_test.go +++ b/internal/batch/workflow_test.go @@ -69,5 +69,5 @@ func TestBatchActivityFailsWithBogusBatchPath(t *testing.T) { Path: "/non/existent/path", PipelineName: "am", }) - assert.Error(t, err, "open /non/existent/path: no such file or directory") + assert.ErrorContains(t, err, "no such file or directory") } diff --git a/ui/src/openapi-generator/models/SubmitRequestBody.ts b/ui/src/openapi-generator/models/SubmitRequestBody.ts index 1d1968b1..86a4ae2f 100644 --- a/ui/src/openapi-generator/models/SubmitRequestBody.ts +++ b/ui/src/openapi-generator/models/SubmitRequestBody.ts @@ -25,6 +25,12 @@ export interface SubmitRequestBody { * @memberof SubmitRequestBody */ completedDir?: string; + /** + * + * @type {number} + * @memberof SubmitRequestBody + */ + depth?: number; /** * * @type {string} @@ -90,6 +96,7 @@ export function SubmitRequestBodyFromJSONTyped(json: any, ignoreDiscriminator: b return { 'completedDir': !exists(json, 'completed_dir') ? undefined : json['completed_dir'], + 'depth': !exists(json, 'depth') ? undefined : json['depth'], 'path': json['path'], 'pipeline': !exists(json, 'pipeline') ? undefined : json['pipeline'], 'processNameMetadata': !exists(json, 'process_name_metadata') ? undefined : json['process_name_metadata'], @@ -110,6 +117,7 @@ export function SubmitRequestBodyToJSON(value?: SubmitRequestBody | null): any { return { 'completed_dir': value.completedDir, + 'depth': value.depth, 'path': value.path, 'pipeline': value.pipeline, 'process_name_metadata': value.processNameMetadata, diff --git a/ui/src/views/Batch.vue b/ui/src/views/Batch.vue index c3623c88..eee574f2 100644 --- a/ui/src/views/Batch.vue +++ b/ui/src/views/Batch.vue @@ -35,18 +35,22 @@ - + Reject transfers with duplicate names. - + Process transfer name metadata. + + + +
@@ -102,6 +106,7 @@ type UserDefaults = { processNameMetadata: boolean; completedDir: string | null; retentionPeriod: string | null; + depth: number; } const batchDefaultsStorageKey = "batchDefaults"; @@ -123,6 +128,7 @@ export default class Batch extends Vue { processNameMetadata: false, completedDir: null, retentionPeriod: null, + depth: 0, }; private tabIndex: number = 0; @@ -181,6 +187,7 @@ export default class Batch extends Vue { this.form.processNameMetadata = defaults.processNameMetadata; this.form.completedDir = defaults.completedDir; this.form.retentionPeriod = defaults.retentionPeriod; + this.form.depth = defaults.depth; } // Save choices made by the user that can be used as defaults next time. @@ -191,6 +198,7 @@ export default class Batch extends Vue { processNameMetadata: this.form.processNameMetadata, completedDir: this.form.completedDir, retentionPeriod: this.form.retentionPeriod, + depth: this.form.depth, }; localStorage.setItem(batchDefaultsStorageKey, JSON.stringify(defaults)); } @@ -240,6 +248,7 @@ export default class Batch extends Vue { request.submitRequestBody.transferType = this.form.transferType; } request.submitRequestBody.processNameMetadata = this.form.processNameMetadata; + request.submitRequestBody.depth = Number(this.form.depth); return EnduroBatchClient.batchSubmit(request).then((response: api.BatchSubmitResponseBody) => { this.saveDefaults(); this.loadStatus();