Skip to content

Commit

Permalink
Add batch option to determine depth of SIPs
Browse files Browse the repository at this point in the history
Co-authored-by: camlyall <[email protected]>
  • Loading branch information
sevein and camlyall committed Sep 14, 2023
1 parent c3d0726 commit 7db1b6d
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 13 deletions.
4 changes: 4 additions & 0 deletions internal/api/design/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var _ = Service("batch", func() {
Attribute("reject_duplicates", Boolean, func() { Default(false) })
Attribute("transfer_type", String)
Attribute("process_name_metadata", Boolean, func() { Default(false) })
Attribute("depth", Int, func() {
Default(0)
Minimum(0)
})
Required("path")
})
Result(BatchResult)
Expand Down
1 change: 1 addition & 0 deletions internal/api/gen/batch/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion internal/api/gen/http/batch/client/cli.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/api/gen/http/batch/client/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions internal/api/gen/http/batch/server/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/api/gen/http/cli/enduro/cli.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions internal/api/gen/http/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/api/gen/http/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/api/gen/http/openapi3.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions internal/api/gen/http/openapi3.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/batch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (s *batchImpl) Submit(ctx context.Context, payload *goabatch.SubmitPayload)
input.TransferType = *payload.TransferType
}
input.MetadataConfig.ProcessNameMetadata = payload.ProcessNameMetadata
input.Depth = int32(payload.Depth)
opts := temporalsdk_client.StartWorkflowOptions{
ID: BatchWorkflowID,
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
Expand Down
48 changes: 39 additions & 9 deletions internal/batch/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package batch

import (
"context"
"os"
"io/fs"
"path/filepath"
"strings"
"time"

temporalsdk_temporal "go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -32,6 +34,7 @@ type BatchWorkflowInput struct {
RejectDuplicates bool
TransferType string
MetadataConfig metadata.Config
Depth int32
}

func BatchWorkflow(ctx temporalsdk_workflow.Context, params BatchWorkflowInput) error {
Expand All @@ -56,19 +59,39 @@ func NewBatchActivity(batchsvc Service) *BatchActivity {
}

func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput) error {
files, err := os.ReadDir(params.Path)
if err != nil {
return temporal.NewNonRetryableError(err)
}
pipelines := []string{}
if params.PipelineName != "" {
pipelines = append(pipelines, params.PipelineName)
}
for _, file := range files {

if params.Depth < 0 {
params.Depth = 0
}

root := params.Path
err := filepath.WalkDir(root, func(path string, entry fs.DirEntry, err error) error {
if err != nil {
return err
}

rel, err := filepath.Rel(root, path)
if err != nil {
return err
}

if rel == "." {
return nil // Ignore root.
}

depth := len(strings.Split(rel, string(filepath.Separator))) - 1
if depth != int(params.Depth) {
return nil // Keep walking.
}

req := collection.ProcessingWorkflowRequest{
BatchDir: params.Path,
Key: file.Name(),
IsDir: file.IsDir(),
BatchDir: filepath.Dir(path),
Key: entry.Name(),
IsDir: entry.IsDir(),
PipelineNames: pipelines,
ProcessingConfig: params.ProcessingConfig,
CompletedDir: params.CompletedDir,
Expand All @@ -77,7 +100,14 @@ func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput)
TransferType: params.TransferType,
MetadataConfig: params.MetadataConfig,
}

_ = a.batchsvc.InitProcessingWorkflow(ctx, &req)

return fs.SkipDir
})
if err != nil {
return temporal.NewNonRetryableError(err)
}

return nil
}
2 changes: 1 addition & 1 deletion internal/batch/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ func TestBatchActivityFailsWithBogusBatchPath(t *testing.T) {
Path: "/non/existent/path",
PipelineName: "am",
})
assert.Error(t, err, "open /non/existent/path: no such file or directory")
assert.ErrorContains(t, err, "no such file or directory")
}
8 changes: 8 additions & 0 deletions ui/src/openapi-generator/models/SubmitRequestBody.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7db1b6d

Please sign in to comment.