Skip to content

Commit

Permalink
workflow: multiple improvements
Browse files Browse the repository at this point in the history
* Started working towards dedicated input/output structs for activities. It is
  more verbose but also cleanier.
* Move each activity to its own file.
* Try to identify the format of the package also by header.
* Do not stop workflow when the archive filename does not match the regular
  expression. Be more forgiving, enable activities to work with default values.
  • Loading branch information
sevein committed Nov 7, 2019
1 parent 5fafe70 commit c528595
Show file tree
Hide file tree
Showing 16 changed files with 578 additions and 375 deletions.
12 changes: 7 additions & 5 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,26 @@ func (p PipelineRegistry) Config(name string) (*Config, error) {
func (p PipelineRegistry) Client(name string) (*amclient.Client, error) {
cfg, err := p.Config(name)
if err != nil {
return nil, fmt.Errorf("Error fetching pipeline configuration: %w", err)
return nil, fmt.Errorf("error fetching pipeline configuration: %w", err)
}

client, err := amclient.New(httpClient(), cfg.BaseURL, cfg.User, cfg.Key)
if err != nil {
return nil, fmt.Errorf("Error creating Archivematica API client: %w", err)
return nil, fmt.Errorf("error creating Archivematica API client: %w", err)
}

return client, nil
}

func (p PipelineRegistry) TempFile(name, key string) (*os.File, error) {
// TempFile returns a new temporary file inside the processing directory of the
// given pipeline.
func (p PipelineRegistry) TempFile(name string) (*os.File, error) {
cfg, err := p.Config(name)
if err != nil {
return nil, fmt.Errorf("Error fetching pipeline configuration: %w", err)
return nil, fmt.Errorf("error fetching pipeline configuration: %w", err)
}

return ioutil.TempFile(cfg.ProcessingDir, fmt.Sprintf("*-%s", key))
return ioutil.TempFile(cfg.ProcessingDir, "blob-*")
}

func expandPath(path string) string {
Expand Down
2 changes: 1 addition & 1 deletion internal/pipeline/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
ErrStatusNonRetryable = errors.New("non-retryable error")
ErrStatusNonRetryable = errors.New("non retryable error")
ErrStatusRetryable = errors.New("retryable error")
ErrStatusInProgress = errors.New("waitable error")
)
Expand Down
144 changes: 144 additions & 0 deletions internal/workflow/bundle_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package workflow

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"

"github.com/artefactual-labs/enduro/internal/amclient/bundler"
"github.com/mholt/archiver"
)

type BundleActivity struct{}

func NewBundleActivity() *BundleActivity {
return &BundleActivity{}
}

type BundleActivityParams struct {
TransferDir string
Key string
TempFile string
}

type BundleActivityResult struct {
Name string // Name of the transfer.
Kind string // Client specific, obtained from name, e.g. "DPJ-SIP".
RelPath string // Path of the transfer relative to the transfer directory.
FullPath string // Full path to the transfer in the worker running the session.
}

func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityParams) (*BundleActivityResult, error) {
var (
res = &BundleActivityResult{}
err error
)

defer func() {
if err != nil {
err = nonRetryableError(err)
}
}()

unar := a.Unarchiver(params.Key, params.TempFile)
if unar == nil {
res.FullPath, err = a.SingleFile(ctx, params.TransferDir, params.Key, params.TempFile)
} else {
res.FullPath, err = a.Bundle(ctx, unar, params.TransferDir, params.Key, params.TempFile)
}
if err != nil {
return nil, err
}

res.RelPath, err = filepath.Rel(params.TransferDir, res.FullPath)
if err != nil {
return nil, fmt.Errorf("error calculating relative path to transfer (base=%q, target=%q): %v", params.TransferDir, res.FullPath, err)
}

res.Name, res.Kind = a.NameKind(params.Key)

return res, err
}

// Unarchiver returns the unarchiver suited for the archival format.
func (a *BundleActivity) Unarchiver(key, filename string) archiver.Unarchiver {
if iface, err := archiver.ByExtension(key); err == nil {
if u, ok := iface.(archiver.Unarchiver); ok {
return u
}
}

file, err := os.Open(filename)
if err != nil {
return nil
}
defer file.Close()
if u, err := archiver.ByHeader(file); err == nil {
return u
}

return nil
}

// SingleFile bundles a transfer with the downloaded blob in it.
func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, key, tempFile string) (string, error) {
b, err := bundler.NewBundlerWithTempDir(transferDir)
if err != nil {
return "", fmt.Errorf("error creating bundle: %v", err)
}

dest, err := b.Create(filepath.Join("objects", key))
if err != nil {
return "", fmt.Errorf("error creating file: %v", err)
}
defer dest.Close()

if err := os.Rename(tempFile, filepath.Join(transferDir, dest.Name())); err != nil {
return "", fmt.Errorf("error moving file (from %s to %s): %v", tempFile, dest.Name(), err)
}

if err := b.Bundle(); err != nil {
return "", fmt.Errorf("error bundling the transfer: %v", err)
}

return b.FullBaseFsPath(), nil
}

// Bundle a transfer with the contents found in the archive.
func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, key, tempFile string) (string, error) {
// Create a new directory for our transfer with the name randomized.
const prefix = "enduro"
tempDir, err := ioutil.TempDir(transferDir, prefix)
if err != nil {
return "", fmt.Errorf("error creating temporary directory: %s", err)
}
_ = os.Chmod(tempDir, os.FileMode(0o755))

if err := unar.Unarchive(tempFile, tempDir); err != nil {
return "", fmt.Errorf("error unarchiving file: %v", err)
}

// Delete the archive. We still have a copy in the watched source.
_ = os.Remove(tempFile)

return tempDir, nil
}

var regex = regexp.MustCompile(`^(?P<kind>.*)-(?P<uuid>[a-z0-9]{8}-[a-z0-9]{4}-[1-5][a-z0-9]{3}-[a-z0-9]{4}-[a-z0-9]{12})(?P<fileext>\..*)?$`)

// Name the transfer.
func (a *BundleActivity) NameKind(key string) (name, kind string) {
matches := regex.FindStringSubmatch(key)

if len := len(matches); len == 0 {
name = key
} else if len == 4 {
name = fmt.Sprintf("%s-%s", matches[1], matches[2][0:13])
kind = matches[1]
}

return name, kind
}
26 changes: 26 additions & 0 deletions internal/workflow/bundle_activity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package workflow

import "testing"

func TestBundleActivity_NameKind(t *testing.T) {
a := &BundleActivity{}
tests := []struct {
key string
wantName string
wantKind string
}{
{"foobar.jpg", "foobar.jpg", ""},
{"c5ecddb0-7a61-4234-80a9-fa7993e97867.tar", "c5ecddb0-7a61-4234-80a9-fa7993e97867.tar", ""},
{"DPJ-SIP-c5ecddb0-7a61-4234-80a9-fa7993e97867", "DPJ-SIP-c5ecddb0-7a61", "DPJ-SIP"},
{"DPJ-SIP-c5ecddb0-7a61-4234-80a9-fa7993e97867.tar", "DPJ-SIP-c5ecddb0-7a61", "DPJ-SIP"},
}
for _, tt := range tests {
gotName, gotKind := a.NameKind(tt.key)
if gotName != tt.wantName {
t.Errorf("BundleActivity.NameKind() gotName = %v, want %v", gotName, tt.wantName)
}
if gotKind != tt.wantKind {
t.Errorf("BundleActivity.NameKind() gotKind = %v, want %v", gotKind, tt.wantKind)
}
}
}
31 changes: 31 additions & 0 deletions internal/workflow/cleanup_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package workflow

import (
"context"
"fmt"
"os"
)

type CleanUpActivity struct {
manager *Manager
}

func NewCleanUpActivity(m *Manager) *CleanUpActivity {
return &CleanUpActivity{manager: m}
}

type CleanUpActivityParams struct {
FullPath string
}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error {
if params == nil || params.FullPath == "" {
return errMissingParameters
}

if err := os.RemoveAll(params.FullPath); err != nil {
return fmt.Errorf("error removing transfer directory: %v", err)
}

return nil
}
19 changes: 19 additions & 0 deletions internal/workflow/delete_original_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package workflow

import (
"context"

"github.com/artefactual-labs/enduro/internal/watcher"
)

type DeleteOriginalActivity struct {
manager *Manager
}

func NewDeleteOriginalActivity(m *Manager) *DeleteOriginalActivity {
return &DeleteOriginalActivity{manager: m}
}

func (a *DeleteOriginalActivity) Execute(ctx context.Context, event *watcher.BlobEvent) error {
return a.manager.Watcher.Delete(ctx, event)
}
36 changes: 36 additions & 0 deletions internal/workflow/download_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package workflow

import (
"context"
"errors"
"fmt"

"github.com/artefactual-labs/enduro/internal/watcher"
)

// DownloadActivity downloads the blob into the pipeline processing directory.
type DownloadActivity struct {
manager *Manager
}

func NewDownloadActivity(m *Manager) *DownloadActivity {
return &DownloadActivity{manager: m}
}

func (a *DownloadActivity) Execute(ctx context.Context, event *watcher.BlobEvent) (string, error) {
if event == nil {
return "", nonRetryableError(errors.New("error reading parameters"))
}

file, err := a.manager.Pipelines.TempFile(event.PipelineName)
if err != nil {
return "", nonRetryableError(fmt.Errorf("error creating temporary file in processing directory: %v", err))
}
defer file.Close()

if err := a.manager.Watcher.Download(ctx, file, event); err != nil {
return "", nonRetryableError(fmt.Errorf("error downloading blob: %v", err))
}

return file.Name(), nil
}
4 changes: 4 additions & 0 deletions internal/workflow/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package workflow

import (
"errors"

"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/workflow"
Expand Down Expand Up @@ -28,3 +30,5 @@ const NRE = "non retryable error"
func nonRetryableError(err error) error {
return cadence.NewCustomError(NRE, err.Error())
}

var errMissingParameters = errors.New("missing parameters")
4 changes: 2 additions & 2 deletions internal/workflow/hari_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (a UpdateHARIActivity) url() (string, error) {

func (a UpdateHARIActivity) sendRequest(ctx context.Context, apiURL string, tinfo *TransferInfo) error {
// Location of AVLXML, e.g.: // e.g. `/transfer-path/<uuid>/DPJ/journal/<uuid>.xml`.
var path = filepath.Join(tinfo.FullPath, tinfo.OriginalID, "DPJ", "journal", tinfo.OriginalID+".xml")
var path = filepath.Join(tinfo.Bundle.FullPath, tinfo.OriginalID, "DPJ", "journal", tinfo.OriginalID+".xml")

// Is there a better way to do this? We need to build the JSON document but
// maybe this can be done with a buffer?
Expand All @@ -88,7 +88,7 @@ func (a UpdateHARIActivity) sendRequest(ctx context.Context, apiURL string, tinf
payload := &avlRequest{
XML: blob,
Message: "AVLXML was processed by DPJ Archivematica pipeline",
Type: strings.ToLower(tinfo.Kind),
Type: strings.ToLower(tinfo.Bundle.Kind),
Timestamp: tinfo.StoredAt,
AIPID: tinfo.SIPID,
}
Expand Down
47 changes: 47 additions & 0 deletions internal/workflow/hide_package_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package workflow

import (
"context"
"fmt"
)

type HidePackageActivity struct {
manager *Manager
}

func NewHidePackageActivity(m *Manager) *HidePackageActivity {
return &HidePackageActivity{manager: m}
}

func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error {
amc, err := a.manager.Pipelines.Client(pipelineName)
if err != nil {
return nonRetryableError(fmt.Errorf("error looking up pipeline config: %v", err))
}

if unitType != "transfer" && unitType != "ingest" {
return nonRetryableError(fmt.Errorf("unexpected unit type: %s", unitType))
}

if unitType == "transfer" {
resp, _, err := amc.Transfer.Hide(ctx, unitID)
if err != nil {
return fmt.Errorf("error hiding transfer: %v", err)
}
if !resp.Removed {
return fmt.Errorf("error hiding transfer: not removed")
}
}

if unitType == "ingest" {
resp, _, err := amc.Ingest.Hide(ctx, unitID)
if err != nil {
return fmt.Errorf("error hiding sip: %v", err)
}
if !resp.Removed {
return fmt.Errorf("error hiding sip: not removed")
}
}

return nil
}
Loading

0 comments on commit c528595

Please sign in to comment.