Skip to content

Commit

Permalink
WIP: TBWR handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Pleshakov committed Oct 29, 2024
1 parent fa9a2d6 commit cf3da7e
Show file tree
Hide file tree
Showing 17 changed files with 834 additions and 124 deletions.
22 changes: 22 additions & 0 deletions cmd/integration/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)

func CreateGRPCClient(endpoint string) *grpc.ClientConn {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
for range 5 {
conn, err := grpc.NewClient(endpoint, opts...)
if err == nil {
return conn
}
time.Sleep(time.Second) // Wait before retrying
}
log.Panicln("failed to dial")
return nil
}
9 changes: 2 additions & 7 deletions cmd/integration/list_entities/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"strconv"
"time"
"ydbcp/cmd/integration/common"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
Expand All @@ -16,7 +17,6 @@ import (
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand Down Expand Up @@ -214,12 +214,7 @@ func SchedulesToInsert() []types.BackupSchedule {

func main() {
ctx := context.Background()
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(ydbcpEndpoint, opts...)
if err != nil {
log.Panicln("failed to dial")
}
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
Expand Down
9 changes: 2 additions & 7 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"log"
"strings"
"time"
"ydbcp/cmd/integration/common"

"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand All @@ -23,12 +23,7 @@ const (
)

func main() {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(ydbcpEndpoint, opts...)
if err != nil {
log.Panicln("failed to dial")
}
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
Expand Down
128 changes: 128 additions & 0 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,131 @@ func MakeBackup(

return backup, op, nil
}

func RetryBackup(
ctx context.Context,
clientConn client.ClientConnector,
s3 config.S3Config,
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
tbwr types.TakeBackupWithRetryOperation,
subject string,
) (*types.Backup, *types.TakeBackupOperation, error) {
if !IsAllowedEndpoint(tbwr.YdbConnectionParams.Endpoint, allowedEndpointDomains, allowInsecureEndpoint) {
xlog.Error(
ctx,
"endpoint of database is invalid or not allowed",
zap.String("DatabaseEndpoint", tbwr.YdbConnectionParams.Endpoint),
)
return nil, nil, status.Errorf(
codes.InvalidArgument,
"endpoint of database is invalid or not allowed, endpoint %s", tbwr.YdbConnectionParams.Endpoint,
)
}

dsn := types.MakeYdbConnectionString(tbwr.YdbConnectionParams)
ctx = xlog.With(ctx, zap.String("ClientDSN", dsn))
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()

accessKey, err := s3.AccessKey()
if err != nil {
xlog.Error(ctx, "can't get S3AccessKey", zap.Error(err))
return nil, nil, status.Error(codes.Internal, "can't get S3AccessKey")
}
secretKey, err := s3.SecretKey()
if err != nil {
xlog.Error(ctx, "can't get S3SecretKey", zap.Error(err))
return nil, nil, status.Error(codes.Internal, "can't get S3SecretKey")
}

dbNamePath := strings.Replace(tbwr.YdbConnectionParams.DatabaseName, "/", "_", -1) // TODO: checking user input
dbNamePath = strings.Trim(dbNamePath, "_")

destinationPrefix := path.Join(
s3.PathPrefix,
dbNamePath,
time.Now().Format(types.BackupTimestampFormat),
)
ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix))

sourcePaths := make([]string, 0, len(tbwr.SourcePaths))
for _, p := range tbwr.SourcePaths {
fullPath, ok := SafePathJoin(tbwr.YdbConnectionParams.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, tbwr.SourcePathsToExclude)
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
}

s3Settings := types.ExportSettings{
Endpoint: s3.Endpoint,
Region: s3.Region,
Bucket: s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp backup", // TODO: the description shoud be better
NumberOfRetries: 10, // TODO: get it from configuration
SourcePaths: pathsForExport,
DestinationPrefix: destinationPrefix,
S3ForcePathStyle: s3.S3ForcePathStyle,
}

clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings)
if err != nil {
xlog.Error(ctx, "can't start export operation", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "can't start export operation, dsn %s", dsn)
}
ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID))
xlog.Info(ctx, "Export operation started")

var expireAt *time.Time
if tbwr.Ttl != nil {
expireAt = new(time.Time)
*expireAt = time.Now().Add(*tbwr.Ttl)
}

now := timestamppb.Now()
backup := &types.Backup{
ID: types.GenerateObjectID(),
ContainerID: tbwr.ContainerID,
DatabaseName: tbwr.YdbConnectionParams.DatabaseName,
DatabaseEndpoint: tbwr.YdbConnectionParams.Endpoint,
S3Endpoint: s3.Endpoint,
S3Region: s3.Region,
S3Bucket: s3.Bucket,
S3PathPrefix: destinationPrefix,
Status: types.BackupStateRunning,
AuditInfo: &pb.AuditInfo{
CreatedAt: now,
Creator: subject,
},
ScheduleID: tbwr.ScheduleID,
ExpireAt: expireAt,
SourcePaths: pathsForExport,
}

op := tbwr.SpawnNewTBOperation(backup.ID, subject, clientOperationID)

return backup, &op, nil
}
44 changes: 44 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
updatedAt *time.Time
updatedTs *timestamppb.Timestamp
parentOperationID *string
scheduleID *string
ttl *time.Duration
retriesCount *uint32
maxBackoff *time.Duration
)
err := res.ScanNamed(
named.Required("id", &operationId),
Expand All @@ -170,6 +174,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
named.Optional("initiated", &creator),
named.Optional("updated_at", &updatedAt),
named.Optional("parent_operation_id", &parentOperationID),
named.Optional("schedule_id", &scheduleID),
named.Optional("ttl", &ttl),
named.Optional("retries_count", &retriesCount),
named.Optional("retries_max_backoff", &maxBackoff),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -255,6 +263,42 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
PathPrefix: pathPrefix,
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeTBWR) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for TBWR operation: %s", operationId)
}
var retryConfig *pb.RetryConfig = nil
if maxBackoff != nil {
retryConfig = &pb.RetryConfig{
Retries: &pb.RetryConfig_MaxBackoff{
MaxBackoff: durationpb.New(*maxBackoff),
},
}
}
if retriesCount != nil {
retryConfig = &pb.RetryConfig{
Retries: &pb.RetryConfig_Count{Count: *retriesCount},
}
}
return &types.TakeBackupWithRetryOperation{
TakeBackupOperation: types.TakeBackupOperation{
ID: operationId,
ContainerID: containerId,
State: operationState,
Message: StringOrEmpty(message),
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: databaseEndpoint,
DatabaseName: databaseName,
},
SourcePaths: sourcePathsSlice,
SourcePathsToExclude: sourcePathsToExcludeSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
},
ScheduleID: scheduleID,
Ttl: ttl,
RetryConfig: retryConfig,
}, nil
}

return &types.GenericOperation{ID: operationId}, nil
Expand Down
30 changes: 30 additions & 0 deletions internal/connectors/db/yql/queries/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package queries

import (
"fmt"
"ydbcp/internal/types"
)

var (
ListSchedulesQuery = fmt.Sprintf(
`$last_successful_backup_id = SELECT schedule_id, MAX(b.completed_at) AS recovery_point, MAX_BY(b.id, b.completed_at) AS last_successful_backup_id FROM Backups AS b WHERE b.status = '%s' GROUP BY schedule_id;
$last_backup_id = SELECT schedule_id AS schedule_id_2, MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b GROUP BY schedule_id;
SELECT * FROM BackupSchedules AS schedules
LEFT JOIN $last_successful_backup_id AS b1 ON schedules.id = b1.schedule_id
LEFT JOIN $last_backup_id AS b2 ON schedules.id = b2.schedule_id_2
`, types.BackupStateAvailable,
)
GetScheduleQuery = fmt.Sprintf(
`$rpo_info = SELECT
<|
recovery_point: MAX(b.completed_at),
last_successful_backup_id: MAX_BY(b.id, b.completed_at)
|> FROM Backups AS b WHERE b.status = '%s' AND b.schedule_id = $schedule_id;
$last_backup_id = SELECT MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b WHERE b.schedule_id = $schedule_id;
SELECT s.*, $last_backup_id AS last_backup_id, $rpo_info.recovery_point AS recovery_point, $rpo_info.last_successful_backup_id AS last_successful_backup_id FROM BackupSchedules AS s WHERE s.id = $schedule_id
`, types.BackupStateAvailable,
)
)
17 changes: 15 additions & 2 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type ReadTableQueryImpl struct {
filters [][]table_types.Value
filterFields []string
isLikeFilter map[string]bool
index *string
orderBy *OrderSpec
pageSpec *PageSpec
tableQueryParams []table.ParameterOption
Expand Down Expand Up @@ -165,6 +166,12 @@ func WithPageSpec(spec PageSpec) ReadTableQueryOption {
}
}

func WithIndex(index string) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.index = &index
}
}

func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string {
paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams))
d.tableQueryParams = append(
Expand Down Expand Up @@ -219,17 +226,23 @@ func (d *ReadTableQueryImpl) FormatPage() *string {
return &page
}

func (d *ReadTableQueryImpl) FormatTable() string {
if d.index == nil {
return d.tableName
}
return fmt.Sprintf("%s VIEW %s", d.tableName, *d.index)
}

func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
var res string
filter := d.MakeFilterString()

if d.rawQuery == nil {
if len(d.tableName) == 0 {
return nil, errors.New("no table")
}
res = fmt.Sprintf(
"SELECT * FROM %s%s",
d.tableName,
d.FormatTable(),
filter,
)
} else {
Expand Down
16 changes: 16 additions & 0 deletions internal/connectors/db/yql/queries/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,19 @@ func TestOrderSpec(t *testing.T) {
"bad query format",
)
}

func TestIndex(t *testing.T) {
const (
query = `SELECT * FROM table1 VIEW index`
)
builder := NewReadTableQuery(
WithTableName("table1"),
WithIndex("index"),
)
fq, err := builder.FormatQuery(context.Background())
assert.Empty(t, err)
assert.Equal(
t, query, fq.QueryText,
"bad query format",
)
}
Loading

0 comments on commit cf3da7e

Please sign in to comment.