Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup_service): add source paths to backup #87

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,23 @@ func MakeBackup(
sourcePaths = append(sourcePaths, fullPath)
}

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.GetSourcePathsToExclude())
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

s3Settings := types.ExportSettings{
Endpoint: s3.Endpoint,
Region: s3.Region,
Bucket: s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp backup", // TODO: the description shoud be better
NumberOfRetries: 10, // TODO: get it from configuration
SourcePaths: sourcePaths,
SourcePathToExclude: req.GetSourcePathsToExclude(),
DestinationPrefix: destinationPrefix,
S3ForcePathStyle: s3.S3ForcePathStyle,
Endpoint: s3.Endpoint,
Region: s3.Region,
Bucket: s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp backup", // TODO: the description shoud be better
NumberOfRetries: 10, // TODO: get it from configuration
SourcePaths: pathsForExport,
DestinationPrefix: destinationPrefix,
S3ForcePathStyle: s3.S3ForcePathStyle,
}

clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings)
Expand Down Expand Up @@ -165,8 +170,9 @@ func MakeBackup(
CreatedAt: now,
Creator: subject,
},
ScheduleID: scheduleId,
ExpireAt: expireAt,
ScheduleID: scheduleId,
ExpireAt: expireAt,
SourcePaths: pathsForExport,
}

op := &types.TakeBackupOperation{
Expand Down
47 changes: 23 additions & 24 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientConnector interface {
Open(ctx context.Context, dsn string) (*ydb.Driver, error)
Close(ctx context.Context, clientDb *ydb.Driver) error

PreparePathsForExport(ctx context.Context, clientDb *ydb.Driver, sourcePaths []string, sourcePathsToExclude []string) ([]string, error)
ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error)
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error)
GetOperationStatus(
Expand Down Expand Up @@ -173,13 +174,17 @@ func listDirectory(ctx context.Context, clientDb *ydb.Driver, initialPath string
return result, nil
}

func prepareItemsForExport(
ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings,
) ([]*Ydb_Export.ExportToS3Settings_Item, error) {
func (d *ClientYdbConnector) PreparePathsForExport(
ctx context.Context, clientDb *ydb.Driver, sourcePaths []string, sourcePathsToExclude []string,
) ([]string, error) {
if clientDb == nil {
return nil, fmt.Errorf("unititialized client db driver")
}

sources := make([]string, 0)
exclusions := make([]regexp.Regexp, len(s3Settings.SourcePathToExclude))
exclusions := make([]regexp.Regexp, len(sourcePathsToExclude))

for i, excludePath := range s3Settings.SourcePathToExclude {
for i, excludePath := range sourcePathsToExclude {
reg, err := regexp.Compile(excludePath)
if err != nil {
return nil, fmt.Errorf("error compiling exclude path regexp: %s", err.Error())
Expand All @@ -188,8 +193,8 @@ func prepareItemsForExport(
exclusions[i] = *reg
}

if len(s3Settings.SourcePaths) > 0 {
for _, sourcePath := range s3Settings.SourcePaths {
if len(sourcePaths) > 0 {
for _, sourcePath := range sourcePaths {
list, err := listDirectory(ctx, clientDb, sourcePath, exclusions)
if err != nil {
return nil, err
Expand All @@ -207,9 +212,18 @@ func prepareItemsForExport(
sources = append(sources, list...)
}

items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources))
return sources, nil
}

func (d *ClientYdbConnector) ExportToS3(
ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings,
) (string, error) {
if clientDb == nil {
return "", fmt.Errorf("unititialized client db driver")
}

for i, source := range sources {
items := make([]*Ydb_Export.ExportToS3Settings_Item, len(s3Settings.SourcePaths))
for i, source := range s3Settings.SourcePaths {
// Destination prefix format: s3_destination_prefix/rel_source_path
destinationPrefix := path.Join(
s3Settings.DestinationPrefix,
Expand All @@ -222,21 +236,6 @@ func prepareItemsForExport(
}
}

return items, nil
}

func (d *ClientYdbConnector) ExportToS3(
ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings,
) (string, error) {
if clientDb == nil {
return "", fmt.Errorf("unititialized client db driver")
}

items, err := prepareItemsForExport(ctx, clientDb, s3Settings)
if err != nil {
return "", fmt.Errorf("error preparing list of items for export: %s", err.Error())
}

exportClient := Ydb_Export_V1.NewExportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(
ctx, "Exporting data to s3",
Expand Down
6 changes: 6 additions & 0 deletions internal/connectors/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ func (m *MockClientConnector) Close(_ context.Context, _ *ydb.Driver) error {
return nil
}

func (m *MockClientConnector) PreparePathsForExport(
_ context.Context, _ *ydb.Driver, sourcePaths []string, _ []string,
) ([]string, error) {
return sourcePaths, nil
}

func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings types.ExportSettings) (string, error) {
objects := make([]ObjectPath, 0)
for _, source := range s3Settings.SourcePaths {
Expand Down
8 changes: 8 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
message *string
size *int64
scheduleId *string
sourcePaths *string

creator *string
completedAt *time.Time
Expand All @@ -95,6 +96,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("size", &size),
named.Optional("schedule_id", &scheduleId),
named.Optional("expire_at", &expireAt),
named.Optional("paths", &sourcePaths),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -104,6 +106,11 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
return nil, err
}

sourcePathsSlice := make([]string, 0)
if sourcePaths != nil {
sourcePathsSlice = strings.Split(*sourcePaths, ",")
}

return &types.Backup{
ID: backupId,
ContainerID: containerId,
Expand All @@ -119,6 +126,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
Size: Int64OrZero(size),
ScheduleID: scheduleId,
ExpireAt: expireAt,
SourcePaths: sourcePathsSlice,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$status", table_types.StringValueFromString(b.Status))
d.AddValueParam("$message", table_types.StringValueFromString(b.Message))
d.AddValueParam("$size", table_types.Int64Value(b.Size))
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(b.SourcePaths, ",")))
if b.ScheduleID != nil {
d.AddValueParam("$schedule_id", table_types.StringValueFromString(*b.ScheduleID))
}
Expand Down
3 changes: 2 additions & 1 deletion internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`

func TestQueryBuilder_CreateCreate(t *testing.T) {
const (
queryString = `UPSERT INTO Backups (id, container_id, database, endpoint, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message, size, initiated, created_at) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0, $size_0, $initiated_0, $created_at_0);
queryString = `UPSERT INTO Backups (id, container_id, database, endpoint, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message, size, paths, initiated, created_at) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0, $size_0, $paths_0, $initiated_0, $created_at_0);
UPSERT INTO Operations (id, type, status, message, initiated, created_at, container_id, database, endpoint, backup_id, operation_id) VALUES ($id_1, $type_1, $status_1, $message_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1)`
)
opId := types.GenerateObjectID()
Expand Down Expand Up @@ -112,6 +112,7 @@ UPSERT INTO Operations (id, type, status, message, initiated, created_at, contai
table.ValueParam("$status_0", table_types.StringValueFromString(types.BackupStateAvailable)),
table.ValueParam("$message_0", table_types.StringValueFromString("msg backup")),
table.ValueParam("$size_0", table_types.Int64Value(0)),
table.ValueParam("$paths_0", table_types.StringValueFromString("")),
table.ValueParam("$initiated_0", table_types.StringValueFromString("author")),
table.ValueParam(
"$created_at_0",
Expand Down
12 changes: 7 additions & 5 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Backup struct {
Size int64
ScheduleID *string
ExpireAt *time.Time
SourcePaths []string
}

func (o *Backup) String() string {
Expand All @@ -76,11 +77,12 @@ func (o *Backup) Proto() *pb.Backup {
Bucket: o.S3Bucket,
PathPrefix: o.S3PathPrefix,
},
Audit: o.AuditInfo,
Size: o.Size,
Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]),
Message: o.Message,
ExpireAt: nil,
Audit: o.AuditInfo,
Size: o.Size,
Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]),
Message: o.Message,
ExpireAt: nil,
SourcePaths: o.SourcePaths,
}
if o.ScheduleID != nil {
backup.ScheduleId = *o.ScheduleID
Expand Down
21 changes: 10 additions & 11 deletions internal/types/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ func MakeYdbConnectionString(params YdbConnectionParams) string {
}

type ExportSettings struct {
Endpoint string
Region string
Bucket string
AccessKey string
SecretKey string
Description string
NumberOfRetries uint32
SourcePaths []string
SourcePathToExclude []string
DestinationPrefix string
S3ForcePathStyle bool
Endpoint string
Region string
Bucket string
AccessKey string
SecretKey string
Description string
NumberOfRetries uint32
SourcePaths []string
DestinationPrefix string
S3ForcePathStyle bool
}

type ImportSettings struct {
Expand Down
36 changes: 33 additions & 3 deletions internal/watchers/schedule_watcher/schedule_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ func TestScheduleWatcherSimple(t *testing.T) {
queries.NewWriteTableQueryMock,
)

scheduleWatcherActionCompleted := make(chan struct{})
_ = NewScheduleWatcher(
ctx,
&wg,
clock,
dbConnector,
handler,
watchers.WithTickerProvider(tickerProvider),
watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted),
)

// Wait for the ticker to be initialized
Expand All @@ -96,7 +98,15 @@ func TestScheduleWatcherSimple(t *testing.T) {
}

fakeTicker.Send(clock.Now())
cancel()

// Wait for the watcher action to be completed
select {
case <-ctx.Done():
t.Error("action wasn't completed")
case <-scheduleWatcherActionCompleted:
cancel()
}

wg.Wait()

// check operation status (should be pending)
Expand Down Expand Up @@ -190,13 +200,15 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) {
queries.NewWriteTableQueryMock,
)

scheduleWatcherActionCompleted := make(chan struct{})
_ = NewScheduleWatcher(
ctx,
&wg,
clock,
dbConnector,
handler,
watchers.WithTickerProvider(tickerProvider),
watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted),
)

// Wait for the ticker to be initialized
Expand All @@ -208,7 +220,15 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) {
}

fakeTicker.Send(clock.Now())
cancel()

// Wait for the watcher action to be completed
select {
case <-ctx.Done():
t.Error("action wasn't completed")
case <-scheduleWatcherActionCompleted:
cancel()
}

wg.Wait()

// check operation status (should be pending)
Expand Down Expand Up @@ -309,13 +329,15 @@ func TestScheduleWatcherTwoBackups(t *testing.T) {
queries.NewWriteTableQueryMock,
)

scheduleWatcherActionCompleted := make(chan struct{})
_ = NewScheduleWatcher(
ctx,
&wg,
clock,
dbConnector,
handler,
watchers.WithTickerProvider(tickerProvider),
watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted),
)

// Wait for the ticker to be initialized
Expand All @@ -327,7 +349,15 @@ func TestScheduleWatcherTwoBackups(t *testing.T) {
}

fakeTicker.Send(clock.Now())
cancel()

// Wait for the watcher action to be completed
select {
case <-ctx.Done():
t.Error("action wasn't completed")
case <-scheduleWatcherActionCompleted:
cancel()
}

wg.Wait()

// check operation status (should be pending)
Expand Down
12 changes: 11 additions & 1 deletion internal/watchers/ttl_watcher/ttl_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ func TestTtlWatcher(t *testing.T) {
db := db.NewMockDBConnector(
db.WithBackups(backupMap),
)

ttlWatcherActionCompleted := make(chan struct{})
_ = NewTtlWatcher(
ctx,
&wg,
db,
queries.NewWriteTableQueryMock,
watchers.WithTickerProvider(tickerProvider),
watchers.WithActionCompletedChannel(&ttlWatcherActionCompleted),
)

// Wait for the ticker to be initialized
Expand All @@ -65,7 +68,14 @@ func TestTtlWatcher(t *testing.T) {
t0 := clock.Now().Add(time.Hour)
fakeTicker.Send(t0)

cancel()
// Wait for the watcher action to be completed
select {
case <-ctx.Done():
t.Error("action wasn't completed")
case <-ttlWatcherActionCompleted:
cancel()
}

wg.Wait()

// Check that DeleteBackup operation was created
Expand Down
Loading