Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

98 utilize streams #101

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 108 additions & 10 deletions cmd/internal/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package backup

import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"os"
"path"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/encryption"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/metrics"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/mholt/archives"
cron "github.com/robfig/cron/v3"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -80,6 +83,7 @@ func (b *Backuper) Start(ctx context.Context) error {
}

func (b *Backuper) CreateBackup(ctx context.Context) error {
fmt.Println("CreateBackup")
if !b.sem.TryAcquire(1) {
return constants.ErrBackupAlreadyInProgress
}
Expand All @@ -101,32 +105,113 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {
return fmt.Errorf("could not delete priorly uploaded backup: %w", err)
}

filename, err := b.comp.Compress(backupFilePath)
filename := path.Base(backupFilePath) + b.comp.Extension()

files, _ := os.ReadDir(constants.BackupDir)
for _, file := range files {
b.log.Info("backup file", "file", file.Name())
}

// pipe to compress and buffer compressed data in order to prevent deadlock of pipe and error-handling
reader1, writer1 := io.Pipe()
compressErr := make(chan error, 1)
compressBuffer := &bytes.Buffer{}
go func() {
defer writer1.Close()
defer close(compressErr)

files, err := archives.FilesFromDisk(ctx, &archives.FromDiskOptions{}, map[string]string{constants.BackupDir: backupFilePath + b.comp.Extension()})
fmt.Println("files", files)
if err != nil {
b.metrics.CountError("build_files")
compressErr <- err
return
}

err = b.comp.Compress(ctx, writer1, files)
if err != nil {
b.metrics.CountError("compress")
b.log.Error("error compressing backup", "error", err)
compressErr <- err
return
} else {
compressErr <- nil
}
}()

// buffer compressed data in order to prevent deadlock of pipe and error-handling
go func() {
_, err := io.Copy(compressBuffer, reader1)
if err != nil {
b.metrics.CountError("buffering")
b.log.Error("error buffering compressed data", "error", err)
}
}()

err = <-compressErr
if err != nil {
b.metrics.CountError("compress")
return fmt.Errorf("unable to compress backup: %w", err)
return fmt.Errorf("error compressing backup: %w", err)
}

b.log.Info("compressed backup")

if b.encrypter != nil {
filename, err = b.encrypter.Encrypt(filename)
filename = filename + encryption.Suffix
}

// pipe to encrypt and buffer encrypted data
reader2, writer2 := io.Pipe()
encryptErr := make(chan error)
encryptBuffer := &bytes.Buffer{}
go func() {
defer writer2.Close()
defer close(encryptErr)

if b.encrypter != nil {
err = b.encrypter.Encrypt(compressBuffer, writer2)
if err != nil {
b.metrics.CountError("encrypt")
b.log.Error("error encrypting backup", "error", err)
encryptErr <- err
} else {
encryptErr <- nil
}
} else {
_, err = io.Copy(writer2, compressBuffer)
if err != nil {
b.metrics.CountError("streaming")
b.log.Error("error copying backup", "error", err)
encryptErr <- err
} else {
encryptErr <- nil
}
}
}()

// buffer compressed data in order to prevent deadlock of pipe and error-handling
go func() {
_, err := io.Copy(encryptBuffer, reader2)
if err != nil {
b.metrics.CountError("encrypt")
return fmt.Errorf("error encrypting backup: %w", err)
b.metrics.CountError("buffering")
b.log.Error("error buffering compressed data", "error", err)
}
b.log.Info("encrypted backup")
}()

err = <-encryptErr
if err != nil {
return fmt.Errorf("error encrypting backup: %w", err)
}

err = b.bp.UploadBackup(ctx, filename)
countingReader := &CountingReader{Reader: encryptBuffer}
err = b.bp.UploadBackup(ctx, countingReader, filename)
if err != nil {
b.metrics.CountError("upload")
return fmt.Errorf("error uploading backup: %w", err)
}

b.log.Info("uploaded backup to backup provider bucket")
b.log.Info("uploaded backup to backup provider bucket", "size", countingReader.BytesRead)

b.metrics.CountBackup(filename)
b.metrics.CountBackup(countingReader.BytesRead)

err = b.bp.CleanupBackups(ctx)
if err != nil {
Expand All @@ -138,3 +223,16 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {

return nil
}

// CountingReader is a wrapper around io.Reader that counts the number of bytes read
type CountingReader struct {
io.Reader
BytesRead float64
}

// Read reads from the underlying reader and counts the number of bytes read
func (r *CountingReader) Read(p []byte) (int, error) {
n, err := r.Reader.Read(p)
r.BytesRead += float64(n)
return n, err
}
5 changes: 3 additions & 2 deletions cmd/internal/backup/providers/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package providers

import (
"context"
"io"
"time"
)

Expand All @@ -10,8 +11,8 @@ type BackupProvider interface {
ListBackups(ctx context.Context) (BackupVersions, error)
CleanupBackups(ctx context.Context) error
GetNextBackupName(ctx context.Context) string
DownloadBackup(ctx context.Context, version *BackupVersion, outDir string) (string, error)
UploadBackup(ctx context.Context, sourcePath string) error
DownloadBackup(ctx context.Context, version *BackupVersion, outputWriter io.Writer) error
UploadBackup(ctx context.Context, inputReader io.Reader, sourcePath string) error
}

type BackupVersions interface {
Expand Down
38 changes: 8 additions & 30 deletions cmd/internal/backup/providers/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http"
"path/filepath"
"strconv"
"strings"

"errors"

Expand Down Expand Up @@ -147,53 +146,32 @@ func (b *BackupProviderGCP) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outputWriter io.Writer) error {
gen, err := strconv.ParseInt(version.Version, 10, 64)
if err != nil {
return "", err
return err
}

bucket := b.c.Bucket(b.config.BucketName)

downloadFileName := version.Name
if strings.Contains(downloadFileName, "/") {
downloadFileName = filepath.Base(downloadFileName)
}

backupFilePath := filepath.Join(outDir, downloadFileName)

b.log.Info("downloading", "object", version.Name, "gen", gen, "to", backupFilePath)

r, err := bucket.Object(version.Name).Generation(gen).NewReader(ctx)
if err != nil {
return "", fmt.Errorf("backup not found: %w", err)
return fmt.Errorf("backup not found: %w", err)
}
defer r.Close()

f, err := b.fs.Create(backupFilePath)
if err != nil {
return "", err
}
defer f.Close()

_, err = io.Copy(f, r)
_, err = io.Copy(outputWriter, r)
if err != nil {
return "", fmt.Errorf("error writing file from gcp to filesystem: %w", err)
return fmt.Errorf("error writing file from gcp to filesystem: %w", err)
}

return backupFilePath, nil
return nil
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string) error {
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, inputReader io.Reader, sourcePath string) error {
bucket := b.c.Bucket(b.config.BucketName)

r, err := b.fs.Open(sourcePath)
if err != nil {
return err
}
defer r.Close()

destination := filepath.Base(sourcePath)
if b.config.ObjectPrefix != "" {
destination = b.config.ObjectPrefix + "/" + destination
Expand All @@ -203,7 +181,7 @@ func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string)

obj := bucket.Object(destination)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, r); err != nil {
if _, err := io.Copy(w, inputReader); err != nil {
return err
}
defer w.Close()
Expand Down
13 changes: 9 additions & 4 deletions cmd/internal/backup/providers/gcp/gcp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func Test_BackupProviderGCP(t *testing.T) {
err = afero.WriteFile(fs, backupPath, []byte(backupContent), 0600)
require.NoError(t, err)

err = p.UploadBackup(ctx, backupPath)
backupFile, err := fs.Open(backupPath)
require.NoError(t, err)
err = p.UploadBackup(ctx, backupFile, backupPath)
require.NoError(t, err)

// cleaning up after test
Expand Down Expand Up @@ -153,17 +155,20 @@ func Test_BackupProviderGCP(t *testing.T) {
latestVersion := versions.Latest()
require.NotNil(t, latestVersion)

backupFilePath, err := p.DownloadBackup(ctx, latestVersion, "")
outputfile, err := fs.Create("outputfile")
require.NoError(t, err)

err = p.DownloadBackup(ctx, latestVersion, outputfile)
require.NoError(t, err)

gotContent, err := afero.ReadFile(fs, backupFilePath)
gotContent, err := afero.ReadFile(fs, outputfile.Name())
require.NoError(t, err)

backupContent := fmt.Sprintf("precious data %d", backupAmount-1)
require.Equal(t, backupContent, string(gotContent))

// cleaning up after test
err = fs.Remove(backupFilePath)
err = fs.Remove(outputfile.Name())
require.NoError(t, err)
})

Expand Down
24 changes: 15 additions & 9 deletions cmd/internal/backup/providers/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
Expand All @@ -11,7 +12,6 @@ import (
"errors"

"github.com/metal-stack/backup-restore-sidecar/cmd/internal/backup/providers"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/utils"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/spf13/afero"
)
Expand Down Expand Up @@ -86,28 +86,34 @@ func (b *BackupProviderLocal) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, outputWriter io.Writer) error {
b.log.Info("download backup called for provider local")

source := filepath.Join(b.config.LocalBackupPath, version.Name)

backupFilePath := filepath.Join(outDir, version.Name)

err := utils.Copy(b.fs, source, backupFilePath)
infile, err := b.fs.Open(source)
if err != nil {
return fmt.Errorf("error while opening source-file in download: %w", err)
}
_, err = io.Copy(outputWriter, infile)
if err != nil {
return "", err
return err
}

return backupFilePath, err
return err
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderLocal) UploadBackup(_ context.Context, sourcePath string) error {
func (b *BackupProviderLocal) UploadBackup(_ context.Context, inputReader io.Reader, sourcePath string) error {
b.log.Info("upload backups called for provider local")

destination := filepath.Join(b.config.LocalBackupPath, filepath.Base(sourcePath))
output, err := b.fs.Create(destination)
if err != nil {
return fmt.Errorf("error while creating destination of upload: %w", err)
}

err := utils.Copy(b.fs, sourcePath, destination)
_, err = io.Copy(output, inputReader)
if err != nil {
return err
}
Expand Down
Loading
Loading