From c9e554e916bea68b9dedf3db34389c3538fcb1d8 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Fri, 25 Oct 2024 13:51:17 +0200 Subject: [PATCH] TBWR handler --- .github/workflows/unit-test.yml | 8 + cmd/integration/common/common.go | 22 + cmd/integration/list_entities/main.go | 9 +- cmd/integration/make_backup/main.go | 9 +- cmd/integration/orm/main.go | 155 +++++ cmd/ydbcp/main.go | 6 +- dockerfile | 3 +- internal/backup_operations/make_backup.go | 69 +- internal/connectors/db/mock.go | 18 + internal/connectors/db/process_result_set.go | 43 ++ internal/connectors/db/yql/queries/queries.go | 30 + internal/connectors/db/yql/queries/read.go | 17 +- .../connectors/db/yql/queries/read_test.go | 16 + internal/connectors/db/yql/queries/write.go | 70 +- .../db/yql/schema/create_tables.yql | 5 + internal/handlers/schedule_backup.go | 19 +- internal/handlers/schedule_backup_test.go | 3 +- internal/handlers/take_backup.go | 4 +- internal/handlers/take_backup_retry.go | 242 +++++++ internal/handlers/take_backup_retry_test.go | 643 ++++++++++++++++++ .../server/services/backup/backup_service.go | 5 +- .../backup_schedule_service.go | 35 +- internal/types/operation.go | 45 +- .../schedule_watcher/schedule_watcher.go | 7 +- .../schedule_watcher/schedule_watcher_test.go | 6 +- pkg/proto/ydbcp/v1alpha1/operation.pb.go | 137 ++-- pkg/proto/ydbcp/v1alpha1/operation.proto | 2 + pkg/proto/ydbcp/v1alpha1/retry.pb.go | 181 +++++ pkg/proto/ydbcp/v1alpha1/retry.proto | 15 + 29 files changed, 1651 insertions(+), 173 deletions(-) create mode 100644 cmd/integration/common/common.go create mode 100644 cmd/integration/orm/main.go create mode 100644 internal/connectors/db/yql/queries/queries.go create mode 100644 internal/handlers/take_backup_retry.go create mode 100644 internal/handlers/take_backup_retry_test.go create mode 100644 pkg/proto/ydbcp/v1alpha1/retry.pb.go create mode 100644 pkg/proto/ydbcp/v1alpha1/retry.proto diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 482eddc1..938a5b6c 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -74,3 +74,11 @@ jobs: - name: docker compose down run: | docker compose down + - name: docker compose up + run: | + docker compose up -d + - name: run orm test + run: docker exec local-ydbcp sh -c './orm' + - name: docker compose down + run: | + docker compose down diff --git a/cmd/integration/common/common.go b/cmd/integration/common/common.go new file mode 100644 index 00000000..73f91adf --- /dev/null +++ b/cmd/integration/common/common.go @@ -0,0 +1,22 @@ +package common + +import ( + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "log" + "time" +) + +func CreateGRPCClient(endpoint string) *grpc.ClientConn { + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + for range 5 { + conn, err := grpc.NewClient(endpoint, opts...) + if err == nil { + return conn + } + time.Sleep(time.Second) // Wait before retrying + } + log.Panicln("failed to dial") + return nil +} diff --git a/cmd/integration/list_entities/main.go b/cmd/integration/list_entities/main.go index b61c483a..c79e0021 100644 --- a/cmd/integration/list_entities/main.go +++ b/cmd/integration/list_entities/main.go @@ -8,6 +8,7 @@ import ( "log" "strconv" "time" + "ydbcp/cmd/integration/common" "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" @@ -16,7 +17,6 @@ import ( pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) const ( @@ -214,12 +214,7 @@ func SchedulesToInsert() []types.BackupSchedule { func main() { ctx := context.Background() - var opts []grpc.DialOption - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - conn, err := grpc.NewClient(ydbcpEndpoint, opts...) - if err != nil { - log.Panicln("failed to dial") - } + conn := common.CreateGRPCClient(ydbcpEndpoint) defer func(conn *grpc.ClientConn) { err := conn.Close() if err != nil { diff --git a/cmd/integration/make_backup/main.go b/cmd/integration/make_backup/main.go index e8d71178..0b2121a5 100644 --- a/cmd/integration/make_backup/main.go +++ b/cmd/integration/make_backup/main.go @@ -7,12 +7,12 @@ import ( "log" "strings" "time" + "ydbcp/cmd/integration/common" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) const ( @@ -23,12 +23,7 @@ const ( ) func main() { - var opts []grpc.DialOption - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - conn, err := grpc.NewClient(ydbcpEndpoint, opts...) - if err != nil { - log.Panicln("failed to dial") - } + conn := common.CreateGRPCClient(ydbcpEndpoint) defer func(conn *grpc.ClientConn) { err := conn.Close() if err != nil { diff --git a/cmd/integration/orm/main.go b/cmd/integration/orm/main.go new file mode 100644 index 00000000..e491d07d --- /dev/null +++ b/cmd/integration/orm/main.go @@ -0,0 +1,155 @@ +package main + +import ( + "context" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + "log" + "reflect" + "time" + "ydbcp/cmd/integration/common" + "ydbcp/internal/config" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/types" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" +) + +const ( + containerID = "abcde" + databaseName = "/local" + ydbcpEndpoint = "localhost:50051" + connectionString = "grpcs://local-ydb:2135/local" +) + +var ( + t1 = time.Date(2024, 01, 01, 00, 00, 00, 0, time.UTC) + t2 = time.Date(2024, 01, 02, 00, 00, 00, 0, time.UTC) +) + +func OperationsToInsert() []types.TakeBackupWithRetryOperation { + schedule := "schedule" + ttl := time.Hour * 24 + return []types.TakeBackupWithRetryOperation{ + { + TakeBackupOperation: types.TakeBackupOperation{ + ID: "1", + ContainerID: containerID, + State: "xz", + Message: "nz", + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: ydbcpEndpoint, + DatabaseName: databaseName, + }, + SourcePaths: []string{"path"}, + SourcePathsToExclude: []string{"exclude"}, + Audit: &pb.AuditInfo{ + Creator: "ydbcp", + CreatedAt: timestamppb.New(t1), + CompletedAt: timestamppb.New(t2), + }, + UpdatedAt: timestamppb.New(t2), + }, + ScheduleID: &schedule, + Ttl: &ttl, + RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 5}}, + }, + { + TakeBackupOperation: types.TakeBackupOperation{ + ID: "1", + ContainerID: containerID, + State: "xz", + Message: "nz", + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: ydbcpEndpoint, + DatabaseName: databaseName, + }, + SourcePaths: []string{"path"}, + SourcePathsToExclude: []string{"exclude"}, + Audit: &pb.AuditInfo{ + Creator: "ydbcp", + CreatedAt: timestamppb.New(t1), + CompletedAt: timestamppb.New(t2), + }, + UpdatedAt: timestamppb.New(t2), + }, + ScheduleID: &schedule, + Ttl: &ttl, + RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(ttl)}}, + }, + } +} + +func ReadTBWROperation(ctx context.Context, ydbConn *db.YdbConnector, id string) *types.TakeBackupWithRetryOperation { + operations, err := ydbConn.SelectOperations(ctx, queries.NewReadTableQuery( + queries.WithTableName("Operations"), + queries.WithQueryFilters(queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(id)}, + }), + )) + if err != nil { + log.Panicf("failed to select operations: %v", err) + } + if len(operations) != 1 { + log.Panicf("expected 1 operation, got %d", len(operations)) + } + return operations[0].(*types.TakeBackupWithRetryOperation) +} + +func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operation types.TakeBackupWithRetryOperation) { + err := ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(&operation)) + if err != nil { + log.Panicf("failed to insert operation: %v", err) + } + tbwr := ReadTBWROperation(ctx, ydbConn, operation.ID) + if !reflect.DeepEqual(operation, *tbwr) { + log.Panicf("operation %v corrupted after read and write\ngot %v", operation, *tbwr) + } + + err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation)) + if err != nil { + log.Panicf("failed to insert operation: %v", err) + } + tbwr = ReadTBWROperation(ctx, ydbConn, operation.ID) + if !reflect.DeepEqual(operation, *tbwr) { + log.Panicf("operation %v corrupted after update and read\ngot %v", operation, *tbwr) + } + operation.Message = "xxx" + err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation)) + if err != nil { + log.Panicf("failed to insert operation: %v", err) + } + tbwr = ReadTBWROperation(ctx, ydbConn, operation.ID) + if "xxx" != tbwr.Message { + log.Panicf("operation %v did not change after update", *tbwr) + } +} + +func main() { + ctx := context.Background() + conn := common.CreateGRPCClient(ydbcpEndpoint) + defer func(conn *grpc.ClientConn) { + err := conn.Close() + if err != nil { + log.Panicln("failed to close connection") + } + }(conn) + ydbConn, err := db.NewYdbConnector( + ctx, + config.YDBConnectionConfig{ + ConnectionString: connectionString, + Insecure: true, + Discovery: false, + DialTimeoutSeconds: 10, + }, + ) + if err != nil { + log.Panicf("failed to create ydb connector: %v", err) + } + for _, op := range OperationsToInsert() { + TestTBWROperationORM(ctx, ydbConn, op) + } +} diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 084c3230..db80af67 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -153,10 +153,10 @@ func main() { ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery) backupScheduleHandler := handlers.NewBackupScheduleHandler( - clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, + clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(), ) - schedule_watcher.NewScheduleWatcher(ctx, &wg, clockwork.NewRealClock(), dbConnector, backupScheduleHandler) - + schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler) + xlog.Info(ctx, "YDBCP started") wg.Add(1) go func() { defer wg.Done() diff --git a/dockerfile b/dockerfile index 562a6ae3..7efb1f13 100644 --- a/dockerfile +++ b/dockerfile @@ -18,9 +18,8 @@ RUN go build -o . ./cmd/ydbcp/main.go # Build integration test app RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go - -# Build integration test app RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go +RUN go build -o ./orm ./cmd/integration/orm/main.go # Command to run the executable CMD ["./main", "--config=local_config.yaml"] diff --git a/internal/backup_operations/make_backup.go b/internal/backup_operations/make_backup.go index 756d5609..f18bb50b 100644 --- a/internal/backup_operations/make_backup.go +++ b/internal/backup_operations/make_backup.go @@ -2,6 +2,7 @@ package backup_operations import ( "context" + "github.com/jonboulle/clockwork" "path" "regexp" "strings" @@ -23,6 +24,43 @@ var ( validEndpoint = regexp.MustCompile(`^(grpcs://|grpc://)?([A-Za-z0-9\-\.]+)(:[0-9]+)?$`) ) +type MakeBackupInternalRequest struct { + ContainerID string + DatabaseEndpoint string + DatabaseName string + SourcePaths []string + SourcePathsToExclude []string + ScheduleID *string + Ttl *time.Duration +} + +func FromGRPCRequest(request *pb.MakeBackupRequest, scheduleID *string) MakeBackupInternalRequest { + res := MakeBackupInternalRequest{ + ContainerID: request.ContainerId, + DatabaseEndpoint: request.DatabaseEndpoint, + DatabaseName: request.DatabaseName, + SourcePaths: request.SourcePaths, + SourcePathsToExclude: request.SourcePathsToExclude, + ScheduleID: scheduleID, + } + if ttl := request.Ttl.AsDuration(); request.Ttl != nil { + res.Ttl = &ttl + } + return res +} + +func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInternalRequest { + return MakeBackupInternalRequest{ + ContainerID: tbwr.ContainerID, + DatabaseEndpoint: tbwr.YdbConnectionParams.Endpoint, + DatabaseName: tbwr.YdbConnectionParams.DatabaseName, + SourcePaths: tbwr.SourcePaths, + SourcePathsToExclude: tbwr.SourcePathsToExclude, + ScheduleID: tbwr.ScheduleID, + Ttl: tbwr.Ttl, + } +} + func SafePathJoin(base string, relPath ...string) (fullPath string, ok bool) { paths := append([]string{base}, relPath...) fullPath = path.Join(paths...) @@ -61,8 +99,9 @@ func MakeBackup( s3 config.S3Config, allowedEndpointDomains []string, allowInsecureEndpoint bool, - req *pb.MakeBackupRequest, scheduleId *string, + req MakeBackupInternalRequest, subject string, + clock clockwork.Clock, ) (*types.Backup, *types.TakeBackupOperation, error) { if !IsAllowedEndpoint(req.DatabaseEndpoint, allowedEndpointDomains, allowInsecureEndpoint) { xlog.Error( @@ -110,7 +149,7 @@ func MakeBackup( destinationPrefix := path.Join( s3.PathPrefix, dbNamePath, - time.Now().Format(types.BackupTimestampFormat), + clock.Now().Format(types.BackupTimestampFormat), ) ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix)) @@ -124,7 +163,7 @@ func MakeBackup( sourcePaths = append(sourcePaths, fullPath) } - pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.GetSourcePathsToExclude()) + pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude) if err != nil { xlog.Error(ctx, "error preparing paths for export", zap.Error(err)) return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn) @@ -157,17 +196,17 @@ func MakeBackup( xlog.Info(ctx, "Export operation started") var expireAt *time.Time - if ttl := req.GetTtl(); ttl != nil { + if req.Ttl != nil { expireAt = new(time.Time) - *expireAt = time.Now().Add(ttl.AsDuration()) + *expireAt = clock.Now().Add(*req.Ttl) } - now := timestamppb.Now() + now := timestamppb.New(clock.Now()) backup := &types.Backup{ ID: types.GenerateObjectID(), - ContainerID: req.GetContainerId(), - DatabaseName: req.GetDatabaseName(), - DatabaseEndpoint: req.GetDatabaseEndpoint(), + ContainerID: req.ContainerID, + DatabaseName: req.DatabaseName, + DatabaseEndpoint: req.DatabaseEndpoint, S3Endpoint: s3.Endpoint, S3Region: s3.Region, S3Bucket: s3.Bucket, @@ -177,7 +216,7 @@ func MakeBackup( CreatedAt: now, Creator: subject, }, - ScheduleID: scheduleId, + ScheduleID: req.ScheduleID, ExpireAt: expireAt, SourcePaths: pathsForExport, } @@ -185,14 +224,14 @@ func MakeBackup( op := &types.TakeBackupOperation{ ID: types.GenerateObjectID(), BackupID: backup.ID, - ContainerID: req.ContainerId, + ContainerID: req.ContainerID, State: types.OperationStateRunning, YdbConnectionParams: types.YdbConnectionParams{ - Endpoint: req.GetDatabaseEndpoint(), - DatabaseName: req.GetDatabaseName(), + Endpoint: req.DatabaseEndpoint, + DatabaseName: req.DatabaseName, }, - SourcePaths: req.GetSourcePaths(), - SourcePathsToExclude: req.GetSourcePathsToExclude(), + SourcePaths: req.SourcePaths, + SourcePathsToExclude: req.SourcePathsToExclude, Audit: &pb.AuditInfo{ CreatedAt: now, Creator: subject, diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 8103840c..8cdf7945 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -16,6 +16,7 @@ type MockDBConnector struct { operations map[string]types.Operation backups map[string]types.Backup backupSchedules map[string]types.BackupSchedule + operationIDs *[]string } type Option func(*MockDBConnector) @@ -213,9 +214,26 @@ func (c *MockDBConnector) GetSchedule( return types.BackupSchedule{}, fmt.Errorf("backupSchedule not found, id %s", scheduleID) } +func (c *MockDBConnector) SetOperationsIDSelector(ids []string) { + c.guard.Lock() + defer c.guard.Unlock() + + c.operationIDs = &ids +} + func (c *MockDBConnector) SelectOperations( _ context.Context, _ queries.ReadTableQuery, ) ([]types.Operation, error) { + c.guard.Lock() + defer c.guard.Unlock() + if c.operationIDs != nil { + res := make([]types.Operation, 0, len(*c.operationIDs)) + for _, id := range *c.operationIDs { + res = append(res, c.operations[id].Copy()) + } + c.operationIDs = nil + return res, nil + } res := make([]types.Operation, 0, len(c.operations)) for _, v := range c.operations { res = append(res, v.Copy()) diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 2b6464ca..15587dfe 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -2,6 +2,7 @@ package db import ( "fmt" + "log" "strings" "time" @@ -150,6 +151,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { updatedAt *time.Time updatedTs *timestamppb.Timestamp parentOperationID *string + scheduleID *string + ttl *time.Duration + retriesCount *uint32 + maxBackoff *time.Duration ) err := res.ScanNamed( named.Required("id", &operationId), @@ -170,6 +175,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { named.Optional("initiated", &creator), named.Optional("updated_at", &updatedAt), named.Optional("parent_operation_id", &parentOperationID), + named.Optional("schedule_id", &scheduleID), + named.Optional("ttl", &ttl), + named.Optional("retries_count", &retriesCount), + named.Optional("retries_max_backoff", &maxBackoff), ) if err != nil { return nil, err @@ -188,6 +197,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { } if updatedAt != nil { + log.Print("updated at read from db") updatedTs = timestamppb.New(*updatedAt) } @@ -255,6 +265,39 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { PathPrefix: pathPrefix, UpdatedAt: updatedTs, }, nil + } else if operationType == string(types.OperationTypeTBWR) { + var retryConfig *pb.RetryConfig = nil + if maxBackoff != nil { + retryConfig = &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{ + MaxBackoff: durationpb.New(*maxBackoff), + }, + } + } + if retriesCount != nil { + retryConfig = &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: *retriesCount}, + } + } + return &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: operationId, + ContainerID: containerId, + State: operationState, + Message: StringOrEmpty(message), + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: databaseEndpoint, + DatabaseName: databaseName, + }, + SourcePaths: sourcePathsSlice, + SourcePathsToExclude: sourcePathsToExcludeSlice, + Audit: auditFromDb(creator, createdAt, completedAt), + UpdatedAt: updatedTs, + }, + ScheduleID: scheduleID, + Ttl: ttl, + RetryConfig: retryConfig, + }, nil } return &types.GenericOperation{ID: operationId}, nil diff --git a/internal/connectors/db/yql/queries/queries.go b/internal/connectors/db/yql/queries/queries.go new file mode 100644 index 00000000..f36b4aa8 --- /dev/null +++ b/internal/connectors/db/yql/queries/queries.go @@ -0,0 +1,30 @@ +package queries + +import ( + "fmt" + "ydbcp/internal/types" +) + +var ( + ListSchedulesQuery = fmt.Sprintf( + `$last_successful_backup_id = SELECT schedule_id, MAX(b.completed_at) AS recovery_point, MAX_BY(b.id, b.completed_at) AS last_successful_backup_id FROM Backups AS b WHERE b.status = '%s' GROUP BY schedule_id; +$last_backup_id = SELECT schedule_id AS schedule_id_2, MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b GROUP BY schedule_id; + +SELECT * FROM BackupSchedules AS schedules +LEFT JOIN $last_successful_backup_id AS b1 ON schedules.id = b1.schedule_id +LEFT JOIN $last_backup_id AS b2 ON schedules.id = b2.schedule_id_2 +`, types.BackupStateAvailable, + ) + GetScheduleQuery = fmt.Sprintf( + `$rpo_info = SELECT + <| + recovery_point: MAX(b.completed_at), + last_successful_backup_id: MAX_BY(b.id, b.completed_at) + |> FROM Backups AS b WHERE b.status = '%s' AND b.schedule_id = $schedule_id; + +$last_backup_id = SELECT MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b WHERE b.schedule_id = $schedule_id; + +SELECT s.*, $last_backup_id AS last_backup_id, $rpo_info.recovery_point AS recovery_point, $rpo_info.last_successful_backup_id AS last_successful_backup_id FROM BackupSchedules AS s WHERE s.id = $schedule_id +`, types.BackupStateAvailable, + ) +) diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index ea75f76c..1a7eb8ec 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -98,6 +98,7 @@ type ReadTableQueryImpl struct { filters [][]table_types.Value filterFields []string isLikeFilter map[string]bool + index *string orderBy *OrderSpec pageSpec *PageSpec tableQueryParams []table.ParameterOption @@ -165,6 +166,12 @@ func WithPageSpec(spec PageSpec) ReadTableQueryOption { } } +func WithIndex(index string) ReadTableQueryOption { + return func(d *ReadTableQueryImpl) { + d.index = &index + } +} + func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string { paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams)) d.tableQueryParams = append( @@ -219,17 +226,23 @@ func (d *ReadTableQueryImpl) FormatPage() *string { return &page } +func (d *ReadTableQueryImpl) FormatTable() string { + if d.index == nil { + return d.tableName + } + return fmt.Sprintf("%s VIEW %s", d.tableName, *d.index) +} + func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) { var res string filter := d.MakeFilterString() - if d.rawQuery == nil { if len(d.tableName) == 0 { return nil, errors.New("no table") } res = fmt.Sprintf( "SELECT * FROM %s%s", - d.tableName, + d.FormatTable(), filter, ) } else { diff --git a/internal/connectors/db/yql/queries/read_test.go b/internal/connectors/db/yql/queries/read_test.go index 1d577153..3a1bb3f1 100644 --- a/internal/connectors/db/yql/queries/read_test.go +++ b/internal/connectors/db/yql/queries/read_test.go @@ -196,3 +196,19 @@ func TestOrderSpec(t *testing.T) { "bad query format", ) } + +func TestIndex(t *testing.T) { + const ( + query = `SELECT * FROM table1 VIEW index` + ) + builder := NewReadTableQuery( + WithTableName("table1"), + WithIndex("index"), + ) + fq, err := builder.FormatQuery(context.Background()) + assert.Empty(t, err) + assert.Equal( + t, query, fq.QueryText, + "bad query format", + ) +} diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 58a752af..d0ef7176 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -4,15 +4,14 @@ import ( "context" "errors" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" "log" "strings" - "ydbcp/internal/types" "ydbcp/internal/util/xlog" - - "github.com/ydb-platform/ydb-go-sdk/v3/table" - table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" - "go.uber.org/zap" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) type WriteTableQuery interface { @@ -79,7 +78,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle ) if operation.GetAudit().CompletedAt != nil { d.AddValueParam( - "completed_at", + "$completed_at", table_types.TimestampValueFromTime(operation.GetAudit().CompletedAt.AsTime()), ) } @@ -128,6 +127,54 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle if tb.ParentOperationID != nil { d.AddValueParam("$parent_operation_id", table_types.StringValueFromString(*tb.ParentOperationID)) } + } else if operation.GetType() == types.OperationTypeTBWR { + tbwr, ok := operation.(*types.TakeBackupWithRetryOperation) + if !ok { + log.Fatalf("error cast operation to TakeBackupWithRetryOperation operation_id %s", operation.GetID()) + } + + d.AddValueParam( + "$container_id", table_types.StringValueFromString(tbwr.ContainerID), + ) + d.AddValueParam( + "$database", + table_types.StringValueFromString(tbwr.YdbConnectionParams.DatabaseName), + ) + d.AddValueParam( + "$endpoint", + table_types.StringValueFromString(tbwr.YdbConnectionParams.Endpoint), + ) + if len(tbwr.SourcePaths) > 0 { + d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tbwr.SourcePaths, ","))) + } + if len(tbwr.SourcePathsToExclude) > 0 { + d.AddValueParam( + "$paths_to_exclude", + table_types.StringValueFromString(strings.Join(tbwr.SourcePathsToExclude, ",")), + ) + } + if tbwr.RetryConfig != nil { + switch r := tbwr.RetryConfig.Retries.(type) { + case *pb.RetryConfig_Count: + { + d.AddValueParam("$retries_count", table_types.Uint32Value(r.Count)) + d.AddValueParam("$retries_max_backoff", table_types.NullableIntervalValueFromDuration(nil)) + } + case *pb.RetryConfig_MaxBackoff: + { + d.AddValueParam("$retries_count", table_types.NullableUint32Value(nil)) + d.AddValueParam("$retries_max_backoff", table_types.IntervalValueFromDuration(r.MaxBackoff.AsDuration())) + } + default: + log.Fatalf("bad developers did not account for new oneof value") + } + } + if tbwr.ScheduleID != nil { + d.AddValueParam("$schedule_id", table_types.StringValueFromString(*tbwr.ScheduleID)) + } + if tbwr.Ttl != nil { + d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(*tbwr.Ttl)) + } } else if operation.GetType() == types.OperationTypeRB { rb, ok := operation.(*types.RestoreBackupOperation) if !ok { @@ -183,7 +230,6 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle ) d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix)) - } else { log.Fatalf( "unknown operation type write to db operation_id %s, operation_type %s", @@ -204,10 +250,12 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle d.AddValueParam( "$status", table_types.StringValueFromString(operation.GetState().String()), ) - d.AddValueParam( - "$message", - table_types.StringValueFromString(operation.GetMessage()), - ) + if operation.GetMessage() != "" { //so we don't override non-empty message + d.AddValueParam( + "$message", + table_types.StringValueFromString(operation.GetMessage()), + ) + } if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { d.AddValueParam( "$completed_at", diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index 0e68fc74..775db996 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -54,6 +54,11 @@ CREATE TABLE Operations ( paths_to_exclude String, operation_id String, parent_operation_id String, + --used only in TBWR + schedule_id String, + ttl Interval, + retries_count Uint32, + retries_max_backoff Interval, INDEX idx_cc GLOBAL ON (container_id, created_at, id), INDEX idx_cbc GLOBAL ON (container_id, backup_id, created_at, id), diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index 548097ba..799dadeb 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -3,8 +3,8 @@ package handlers import ( "context" "errors" + "github.com/jonboulle/clockwork" "go.uber.org/zap" - "time" "ydbcp/internal/backup_operations" "ydbcp/internal/config" "ydbcp/internal/connectors/client" @@ -15,18 +15,19 @@ import ( pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) -type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule, time.Time) error +type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule) error func NewBackupScheduleHandler( clientConn client.ClientConnector, s3 config.S3Config, clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBulderFactory, + clock clockwork.Clock, ) BackupScheduleHandlerType { - return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule, now time.Time) error { + return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error { return BackupScheduleHandler( - ctx, driver, schedule, now, clientConn, s3, clientConfig, - queryBuilderFactory, + ctx, driver, schedule, clientConn, s3, clientConfig, + queryBuilderFactory, clock, ) } } @@ -35,18 +36,18 @@ func BackupScheduleHandler( ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule, - now time.Time, clientConn client.ClientConnector, s3 config.S3Config, clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBulderFactory, + clock clockwork.Clock, ) error { if schedule.Status != types.BackupScheduleStateActive { xlog.Error(ctx, "backup schedule is not active", zap.String("scheduleID", schedule.ID)) return errors.New("backup schedule is not active") } // do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron. - if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) { + if schedule.NextLaunch != nil && schedule.NextLaunch.Before(clock.Now()) { backupRequest := &pb.MakeBackupRequest{ ContainerId: schedule.ContainerID, @@ -65,12 +66,12 @@ func BackupScheduleHandler( b, op, err := backup_operations.MakeBackup( ctx, clientConn, s3, clientConfig.AllowedEndpointDomains, clientConfig.AllowInsecureEndpoint, - backupRequest, &schedule.ID, types.OperationCreatorName, //TODO: who to put as subject here? + backup_operations.FromGRPCRequest(backupRequest, &schedule.ID), types.OperationCreatorName, clock, ) if err != nil { return err } - err = schedule.UpdateNextLaunch(now) + err = schedule.UpdateNextLaunch(clock.Now()) if err != nil { return err } diff --git a/internal/handlers/schedule_backup_test.go b/internal/handlers/schedule_backup_test.go index 20e7d16b..8a2b7c06 100644 --- a/internal/handlers/schedule_backup_test.go +++ b/internal/handlers/schedule_backup_test.go @@ -57,8 +57,9 @@ func TestBackupScheduleHandler(t *testing.T) { AllowInsecureEndpoint: true, }, queries.NewWriteTableQueryMock, + clock, ) - err := handler(ctx, dbConnector, schedule, clock.Now()) + err := handler(ctx, dbConnector, schedule) assert.Empty(t, err) // check operation status (should be pending) diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index d18674ec..d1604b66 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -20,10 +20,10 @@ import ( func NewTBOperationHandler( db db.DBConnector, client client.ClientConnector, s3 s3.S3Connector, config config.Config, - queryBulderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBulderFactory, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBOperationHandler(ctx, op, db, client, s3, config, queryBulderFactory) + return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory) } } diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go new file mode 100644 index 00000000..8d5d4258 --- /dev/null +++ b/internal/handlers/take_backup_retry.go @@ -0,0 +1,242 @@ +package handlers + +import ( + "context" + "errors" + "fmt" + "github.com/jonboulle/clockwork" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "math" + "time" + "ydbcp/internal/backup_operations" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" +) + +func NewTBWROperationHandler( + db db.DBConnector, + client client.ClientConnector, + s3 config.S3Config, + clientConfig config.ClientConnectionConfig, + queryBuilderFactory queries.WriteQueryBulderFactory, + clock clockwork.Clock, +) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { + return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock) + } +} + +const ( + INTERNAL_MAX_RETRIES = 20 + MIN_BACKOFF = time.Minute + BACKOFF_EXP = 1.5 + Error = iota + RunNewTb = iota + Skip = iota + Success = iota +) + +func exp(p int) time.Duration { + return time.Duration(math.Pow(BACKOFF_EXP, float64(p))) +} + +func shouldRetry(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clock clockwork.Clock) *time.Time { + if config == nil { + return nil + } + ops := len(tbOps) + lastEnd := tbOps[ops-1].Audit.CompletedAt.AsTime() + firstStart := tbOps[0].Audit.CreatedAt.AsTime() + if ops == INTERNAL_MAX_RETRIES { + return nil + } + + switch r := config.Retries.(type) { + case *pb.RetryConfig_Count: + { + if int(r.Count) == ops { + return nil + } + } + case *pb.RetryConfig_MaxBackoff: + { + if clock.Now().Sub(firstStart) >= r.MaxBackoff.AsDuration() { + return nil + } + } + default: + return nil + } + + res := lastEnd.Add(exp(ops) * MIN_BACKOFF) + return &res +} + +func HandleTbOps(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clock clockwork.Clock) (int, error) { + //select last tbOp. + //if nothing, run new, skip + //if there is a tbOp, check its status + //if success: set success to itself + //if cancelled: set error to itself + //if error: + //if we can retry it: retry, skip + //if no more retries: set error to itself + if len(tbOps) == 0 { + return RunNewTb, nil + } + last := tbOps[len(tbOps)-1] + switch last.State { + case types.OperationStateDone: + return Success, nil + case types.OperationStateError: + { + t := shouldRetry(config, tbOps, clock) + if t != nil { + if clock.Now().After(*t) { + return RunNewTb, nil + } else { + return Skip, nil + } + } + //t == nil means "do not retry anymore". + //if we don't want to retry, and last one + //is Error, set Error + return Error, nil + } + case types.OperationStateRunning: + return Skip, nil + } + return Error, errors.New("unexpected tb op status") +} + +func TBWROperationHandler( + ctx context.Context, + operation types.Operation, + db db.DBConnector, + clientConn client.ClientConnector, + s3 config.S3Config, + clientConfig config.ClientConnectionConfig, + queryBuilderFactory queries.WriteQueryBulderFactory, + clock clockwork.Clock, +) error { + xlog.Info(ctx, "TBWROperationHandler", zap.String("OperationID", operation.GetID())) + + if operation.GetType() != types.OperationTypeTBWR { + return fmt.Errorf("wrong operation type %s != %s", operation.GetType(), types.OperationTypeTBWR) + } + tbwr, ok := operation.(*types.TakeBackupWithRetryOperation) + if !ok { + return fmt.Errorf("can't cast Operation to TakeBackupWithRetryOperation %s", types.OperationToString(operation)) + } + + ops, err := db.SelectOperations(ctx, queries.NewReadTableQuery( + queries.WithTableName("Operations"), + queries.WithIndex("idx_pc"), + queries.WithQueryFilters(queries.QueryFilter{ + Field: "parent_operation_id", + Values: []table_types.Value{table_types.StringValueFromString(tbwr.ID)}, + }), + queries.WithOrderBy(queries.OrderSpec{ + Field: "created_at", + }), + )) + tbOps := make([]*types.TakeBackupOperation, len(ops)) + for i := range ops { + tbOps[i] = ops[i].(*types.TakeBackupOperation) + } + if err != nil { + return fmt.Errorf("can't select Operations for TBWR op %s", tbwr.ID) + } + + switch tbwr.State { + case types.OperationStateRunning: + { + do, err := HandleTbOps(tbwr.RetryConfig, tbOps, clock) + if err != nil { + tbwr.State = types.OperationStateError + tbwr.Message = err.Error() + errup := db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + if errup != nil { + return errup + } + return err + } + switch do { + case Success: + { + tbwr.State = types.OperationStateDone + tbwr.Message = "Success" + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + } + case Skip: + return nil + case Error: + { + tbwr.State = types.OperationStateError + tbwr.Message = "retry attempts exhausted" + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + } + case RunNewTb: + { + backup, tb, err := backup_operations.MakeBackup( + ctx, + clientConn, + s3, + clientConfig.AllowedEndpointDomains, + clientConfig.AllowInsecureEndpoint, + backup_operations.FromTBWROperation(tbwr), + types.OperationCreatorName, + clock, + ) + if err != nil { + return err + } + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithCreateBackup(*backup).WithCreateOperation(tb)) + } + default: + tbwr.State = types.OperationStateError + tbwr.Message = "unexpected operation state" + _ = db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + return errors.New(tbwr.Message) + } + } + case types.OperationStateCancelling: + //select last tbOp. + //if has last and not cancelled: set start_cancelling to it, skip + //if cancelled, set cancelled to itself + { + if len(tbOps) == 0 { + tbwr.State = types.OperationStateCancelled + tbwr.Message = "Success" + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + } + last := tbOps[len(tbOps)-1] + if !types.IsActive(last) { + tbwr.State = types.OperationStateCancelled + tbwr.Message = "Success" + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + } else { + if last.State == types.OperationStatePending || last.State == types.OperationStateRunning { + last.State = types.OperationStateStartCancelling + last.Message = "Cancelling by parent operation" + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(last)) + } + } + } + default: + { + tbwr.State = types.OperationStateError + tbwr.Message = "unexpected operation state" + _ = db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + return errors.New(tbwr.Message) + } + } + + return nil +} diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go new file mode 100644 index 00000000..8097da46 --- /dev/null +++ b/internal/handlers/take_backup_retry_test.go @@ -0,0 +1,643 @@ +package handlers + +import ( + "context" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + "testing" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/types" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" +) + +var ( + t1 = timestamppb.New(time.Date(2024, 01, 01, 0, 0, 0, 0, time.UTC)) + t2 = timestamppb.New(time.Date(2024, 01, 01, 1, 0, 0, 0, time.UTC)) + t3 = timestamppb.New(time.Date(2024, 01, 01, 2, 0, 0, 0, time.UTC)) + t4 = timestamppb.New(time.Date(2024, 01, 01, 3, 0, 0, 0, time.UTC)) + r1 = t3.AsTime().Add(exp(2) * time.Minute) + r2 = t4.AsTime().Add(exp(3) * time.Minute) +) + +func TestRetryLogic(t *testing.T) { + for _, tt := range []struct { + config *pb.RetryConfig + ops []*types.TakeBackupOperation + res *time.Time + clockTime time.Time + }{ + { + config: nil, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + }, + }, + res: nil, + }, + { + config: &pb.RetryConfig{}, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + }, + }, + res: nil, + }, + { + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 2}, + }, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + }, + }, + res: nil, + }, + { + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 3}, + }, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + }, + }, + res: &r1, + }, + { + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 4}, + }, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t3, + CompletedAt: t4, + }, + }, + }, + res: &r2, + }, + { + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour)}, + }, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + }, + clockTime: t4.AsTime(), + res: nil, + }, + { + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour * 24)}, + }, + ops: []*types.TakeBackupOperation{ + { + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + }, + { + Audit: &pb.AuditInfo{ + CreatedAt: t3, + CompletedAt: t4, + }, + }, + }, + clockTime: t4.AsTime().Add(time.Second), + res: &r2, + }, + } { + t.Run("", func(t *testing.T) { + require.Equal(t, tt.res, shouldRetry(tt.config, tt.ops, clockwork.NewFakeClockAt(tt.clockTime))) + }) + } +} + +func toMap(list ...types.Operation) map[string]types.Operation { + res := make(map[string]types.Operation) + for _, op := range list { + res[op.GetID()] = op + } + return res +} + +func TestTBWRHandlerSuccess(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + }, + RetryConfig: nil, + } + ops := []types.Operation{ + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + ParentOperationID: &tbwrID, + }, + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateDone, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + ParentOperationID: &tbwrID, + }, + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{ops[0].GetID(), ops[1].GetID()}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{}, + config.ClientConnectionConfig{}, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) +} + +func TestTBWRHandlerSkipRunning(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + }, + RetryConfig: nil, + } + + ops := []types.Operation{ + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + ParentOperationID: &tbwrID, + }, + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateRunning, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + ParentOperationID: &tbwrID, + }, + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{ops[0].GetID(), ops[1].GetID()}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{}, + config.ClientConnectionConfig{}, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateRunning, op.GetState()) + operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) + assert.Empty(t, err) + assert.Equal(t, 3, len(operations)) +} + +func TestTBWRHandlerSkipError(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + }, + RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 3}}, + } + + ops := []types.Operation{ + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + ParentOperationID: &tbwrID, + }, + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, + ParentOperationID: &tbwrID, + }, + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{ops[0].GetID(), ops[1].GetID()}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{}, + config.ClientConnectionConfig{}, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t3.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateRunning, op.GetState()) + operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) + assert.Empty(t, err) + assert.Equal(t, 3, len(operations)) +} + +func TestTBWRHandlerError(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + }, + RetryConfig: nil, + } + + ops := []types.Operation{ + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, + ParentOperationID: &tbwrID, + }, + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{ops[0].GetID()}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{}, + config.ClientConnectionConfig{}, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t2.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "retry attempts exhausted", op.GetMessage()) +} + +func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + SourcePaths: []string{"path"}, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: "i.valid.com", + DatabaseName: "/mydb", + }, + }, + RetryConfig: nil, + } + + ops := []types.Operation{ + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{ + IsMock: true, + }, + config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateRunning, op.GetState()) + operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) + assert.Empty(t, err) + assert.Equal(t, 2, len(operations)) + tbwr = *op.(*types.TakeBackupWithRetryOperation) + var tb *types.TakeBackupOperation + for _, op = range operations { + if op.GetType() == types.OperationTypeTB { + tb = op.(*types.TakeBackupOperation) + break + } + } + assert.NotNil(t, tb) + assert.Equal(t, types.OperationStateRunning, tb.State) + assert.Equal(t, t1, tb.Audit.CreatedAt) +} + +func TestTBWRHandlerStartCancel(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateCancelling, + Message: "", + SourcePaths: []string{"path"}, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: "i.valid.com", + DatabaseName: "/mydb", + }, + }, + RetryConfig: nil, + } + + ops := []types.Operation{ + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateRunning, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + }, + ParentOperationID: &tbwrID, + }, + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{ops[0].GetID()}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{ + IsMock: true, + }, + config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) + assert.Empty(t, err) + assert.Equal(t, 2, len(operations)) + tbwr = *op.(*types.TakeBackupWithRetryOperation) + var tb *types.TakeBackupOperation + for _, op = range operations { + if op.GetType() == types.OperationTypeTB { + tb = op.(*types.TakeBackupOperation) + break + } + } + assert.NotNil(t, tb) + assert.Equal(t, types.OperationStateStartCancelling, tb.State) + assert.Equal(t, "Cancelling by parent operation", tb.Message) + assert.Equal(t, types.OperationStateCancelling, tbwr.State) +} + +func TestTBWRHandlerFullCancel(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateCancelling, + Message: "", + SourcePaths: []string{"path"}, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: "i.valid.com", + DatabaseName: "/mydb", + }, + }, + RetryConfig: nil, + } + + ops := []types.Operation{ + &types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + State: types.OperationStateCancelled, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + }, + ParentOperationID: &tbwrID, + }, + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{ops[0].GetID()}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{ + IsMock: true, + }, + config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelled, op.GetState()) + operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) + assert.Empty(t, err) + assert.Equal(t, 2, len(operations)) + tbwr = *op.(*types.TakeBackupWithRetryOperation) + var tb *types.TakeBackupOperation + for _, op = range operations { + if op.GetType() == types.OperationTypeTB { + tb = op.(*types.TakeBackupOperation) + break + } + } + assert.NotNil(t, tb) + assert.Equal(t, types.OperationStateCancelled, tb.State) + assert.Equal(t, types.OperationStateCancelled, tbwr.State) + assert.Equal(t, "Success", tbwr.Message) +} diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 98e30517..76c402e5 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -2,6 +2,7 @@ package backup import ( "context" + "github.com/jonboulle/clockwork" "strconv" "ydbcp/internal/auth" @@ -32,6 +33,7 @@ type BackupService struct { auth ap.AuthProvider allowedEndpointDomains []string allowInsecureEndpoint bool + clock clockwork.Clock } func (s *BackupService) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (*pb.Backup, error) { @@ -87,7 +89,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques ctx = xlog.With(ctx, zap.String("SubjectID", subject)) backup, op, err := backup_operations.MakeBackup( - ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject, + ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, backup_operations.FromGRPCRequest(req, nil), subject, s.clock, ) if err != nil { @@ -442,5 +444,6 @@ func NewBackupService( auth: auth, allowedEndpointDomains: allowedEndpointDomains, allowInsecureEndpoint: allowInsecureEndpoint, + clock: clockwork.NewRealClock(), } } diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index e763f86e..84f436bf 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -2,7 +2,6 @@ package backup_schedule import ( "context" - "fmt" "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-sdk/v3/table" "strconv" @@ -25,30 +24,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -var ( - ListSchedulesQuery = fmt.Sprintf( - `$last_successful_backup_id = SELECT schedule_id, MAX(b.completed_at) AS recovery_point, MAX_BY(b.id, b.completed_at) AS last_successful_backup_id FROM Backups AS b WHERE b.status = '%s' GROUP BY schedule_id; -$last_backup_id = SELECT schedule_id AS schedule_id_2, MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b GROUP BY schedule_id; - -SELECT * FROM BackupSchedules AS schedules -LEFT JOIN $last_successful_backup_id AS b1 ON schedules.id = b1.schedule_id -LEFT JOIN $last_backup_id AS b2 ON schedules.id = b2.schedule_id_2 -`, types.BackupStateAvailable, - ) - GetScheduleQuery = fmt.Sprintf( - `$rpo_info = SELECT - <| - recovery_point: MAX(b.completed_at), - last_successful_backup_id: MAX_BY(b.id, b.completed_at) - |> FROM Backups AS b WHERE b.status = '%s' AND b.schedule_id = $schedule_id; - -$last_backup_id = SELECT MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b WHERE b.schedule_id = $schedule_id; - -SELECT s.*, $last_backup_id AS last_backup_id, $rpo_info.recovery_point AS recovery_point, $rpo_info.last_successful_backup_id AS last_successful_backup_id FROM BackupSchedules AS s WHERE s.id = $schedule_id -`, types.BackupStateAvailable, - ) -) - type BackupScheduleService struct { pb.UnimplementedBackupScheduleServiceServer driver db.DBConnector @@ -150,7 +125,7 @@ func (s *BackupScheduleService) UpdateBackupSchedule( schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( - queries.WithRawQuery(GetScheduleQuery), + queries.WithRawQuery(queries.GetScheduleQuery), queries.WithParameters( table.ValueParam("$schedule_id", table_types.StringValueFromString(scheduleID)), ), @@ -237,7 +212,7 @@ func (s *BackupScheduleService) GetBackupSchedule( schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( - queries.WithRawQuery(GetScheduleQuery), + queries.WithRawQuery(queries.GetScheduleQuery), queries.WithParameters( table.ValueParam("$schedule_id", table_types.StringValueFromString(scheduleID)), ), @@ -310,7 +285,7 @@ func (s *BackupScheduleService) ListBackupSchedules( schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( - queries.WithRawQuery(ListSchedulesQuery), + queries.WithRawQuery(queries.ListSchedulesQuery), queries.WithQueryFilters(queryFilters...), queries.WithOrderBy( queries.OrderSpec{ @@ -349,7 +324,7 @@ func (s *BackupScheduleService) ToggleBackupSchedule( schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( - queries.WithRawQuery(GetScheduleQuery), + queries.WithRawQuery(queries.GetScheduleQuery), queries.WithParameters( table.ValueParam("$schedule_id", table_types.StringValueFromString(scheduleID)), ), @@ -422,7 +397,7 @@ func (s *BackupScheduleService) DeleteBackupSchedule( schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( - queries.WithRawQuery(GetScheduleQuery), + queries.WithRawQuery(queries.GetScheduleQuery), queries.WithParameters( table.ValueParam("$schedule_id", table_types.StringValueFromString(scheduleID)), ), diff --git a/internal/types/operation.go b/internal/types/operation.go index 6a3c37c6..36a4613f 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strings" + "time" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -260,17 +261,10 @@ func (o *DeleteBackupOperation) Proto() *pb.Operation { } type TakeBackupWithRetryOperation struct { - ID string - ContainerID string - BackupID string - State OperationState - Message string - YdbConnectionParams YdbConnectionParams - YdbOperationId string - SourcePaths []string - SourcePathsToExclude []string - Audit *pb.AuditInfo - UpdatedAt *timestamppb.Timestamp + TakeBackupOperation + ScheduleID *string + Ttl *time.Duration + RetryConfig *pb.RetryConfig } func (o *TakeBackupWithRetryOperation) GetID() string { @@ -283,7 +277,7 @@ func (o *TakeBackupWithRetryOperation) GetContainerID() string { return o.ContainerID } func (o *TakeBackupWithRetryOperation) GetType() OperationType { - return OperationTypeTBR + return OperationTypeTBWR } func (o *TakeBackupWithRetryOperation) SetType(_ OperationType) { } @@ -300,10 +294,10 @@ func (o *TakeBackupWithRetryOperation) SetMessage(m string) { o.Message = m } func (o *TakeBackupWithRetryOperation) GetAudit() *pb.AuditInfo { - return nil + return o.Audit } func (o *TakeBackupWithRetryOperation) GetUpdatedAt() *timestamppb.Timestamp { - return nil + return o.UpdatedAt } func (o *TakeBackupWithRetryOperation) SetUpdatedAt(t *timestamppb.Timestamp) { o.UpdatedAt = t @@ -329,6 +323,27 @@ func (o *TakeBackupWithRetryOperation) Proto() *pb.Operation { Status: o.State.Enum(), Message: o.Message, UpdatedAt: o.UpdatedAt, + RetryConfig: o.RetryConfig, + } +} + +func (o *TakeBackupWithRetryOperation) SpawnNewTBOperation(backupID string, subject string, ydbOperationId string) TakeBackupOperation { + return TakeBackupOperation{ + ID: GenerateObjectID(), + ContainerID: o.ContainerID, + BackupID: backupID, + State: OperationStateRunning, + Message: "", + YdbConnectionParams: o.YdbConnectionParams, + YdbOperationId: ydbOperationId, + SourcePaths: o.SourcePaths, + SourcePathsToExclude: o.SourcePathsToExclude, + Audit: &pb.AuditInfo{ + Creator: subject, + CreatedAt: timestamppb.Now(), + }, + UpdatedAt: timestamppb.Now(), + ParentOperationID: &o.ID, } } @@ -402,7 +417,7 @@ const ( OperationTypeTB = OperationType("TB") OperationTypeRB = OperationType("RB") OperationTypeDB = OperationType("DB") - OperationTypeTBR = OperationType("TBR") + OperationTypeTBWR = OperationType("TBWR") BackupTimestampFormat = "20060102_150405" OperationCreatorName = "ydbcp" ) diff --git a/internal/watchers/schedule_watcher/schedule_watcher.go b/internal/watchers/schedule_watcher/schedule_watcher.go index e45a653b..f4290450 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher.go +++ b/internal/watchers/schedule_watcher/schedule_watcher.go @@ -2,7 +2,6 @@ package schedule_watcher import ( "context" - "github.com/jonboulle/clockwork" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" "sync" @@ -18,7 +17,6 @@ import ( func NewScheduleWatcher( ctx context.Context, wg *sync.WaitGroup, - clock clockwork.Clock, db db.DBConnector, handler handlers.BackupScheduleHandlerType, options ...watchers.Option, @@ -27,7 +25,7 @@ func NewScheduleWatcher( ctx, wg, func(ctx context.Context, period time.Duration) { - ScheduleWatcherAction(ctx, period, clock, db, handler) + ScheduleWatcherAction(ctx, period, db, handler) }, time.Minute, "BackupSchedule", @@ -38,7 +36,6 @@ func NewScheduleWatcher( func ScheduleWatcherAction( baseCtx context.Context, period time.Duration, - clock clockwork.Clock, db db.DBConnector, handler handlers.BackupScheduleHandlerType, ) { @@ -66,7 +63,7 @@ func ScheduleWatcherAction( } for _, schedule := range schedules { - err = handler(ctx, db, *schedule, clock.Now()) + err = handler(ctx, db, *schedule) if err != nil { xlog.Error(ctx, "error handling backup schedule", zap.String("scheduleID", schedule.ID), zap.Error(err)) } diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index 18f33935..00ccc96e 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -77,13 +77,13 @@ func TestScheduleWatcherSimple(t *testing.T) { AllowInsecureEndpoint: true, }, queries.NewWriteTableQueryMock, + clock, ) scheduleWatcherActionCompleted := make(chan struct{}) _ = NewScheduleWatcher( ctx, &wg, - clock, dbConnector, handler, watchers.WithTickerProvider(tickerProvider), @@ -201,13 +201,13 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { AllowInsecureEndpoint: true, }, queries.NewWriteTableQueryMock, + clock, ) scheduleWatcherActionCompleted := make(chan struct{}) _ = NewScheduleWatcher( ctx, &wg, - clock, dbConnector, handler, watchers.WithTickerProvider(tickerProvider), @@ -331,13 +331,13 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { AllowInsecureEndpoint: true, }, queries.NewWriteTableQueryMock, + clock, ) scheduleWatcherActionCompleted := make(chan struct{}) _ = NewScheduleWatcher( ctx, &wg, - clock, dbConnector, handler, watchers.WithTickerProvider(tickerProvider), diff --git a/pkg/proto/ydbcp/v1alpha1/operation.pb.go b/pkg/proto/ydbcp/v1alpha1/operation.pb.go index ba08bad0..49743077 100644 --- a/pkg/proto/ydbcp/v1alpha1/operation.pb.go +++ b/pkg/proto/ydbcp/v1alpha1/operation.pb.go @@ -105,6 +105,7 @@ type Operation struct { Message string `protobuf:"bytes,13,opt,name=message,proto3" json:"message,omitempty"` UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,14,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"` ParentOperationId string `protobuf:"bytes,15,opt,name=parent_operation_id,json=parentOperationId,proto3" json:"parent_operation_id,omitempty"` + RetryConfig *RetryConfig `protobuf:"bytes,16,opt,name=retry_config,json=retryConfig,proto3" json:"retry_config,omitempty"` } func (x *Operation) Reset() { @@ -242,6 +243,13 @@ func (x *Operation) GetParentOperationId() string { return "" } +func (x *Operation) GetRetryConfig() *RetryConfig { + if x != nil { + return x.RetryConfig + } + return nil +} + var File_ydbcp_v1alpha1_operation_proto protoreflect.FileDescriptor var file_ydbcp_v1alpha1_operation_proto_rawDesc = []byte{ @@ -249,61 +257,67 @@ var file_ydbcp_v1alpha1_operation_proto_rawDesc = []byte{ 0x2f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x1a, 0x1b, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, - 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xed, - 0x05, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x21, 0x0a, 0x0c, - 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x64, 0x12, - 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x5f, - 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x61, 0x73, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x61, 0x73, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x10, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x45, 0x6e, 0x64, - 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x35, 0x0a, 0x17, 0x79, 0x64, 0x62, 0x5f, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x5f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, - 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, 0x79, 0x64, 0x62, 0x53, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, - 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x12, 0x35, 0x0a, 0x17, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x5f, 0x74, 0x6f, 0x5f, - 0x65, 0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x18, 0x09, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x54, 0x6f, 0x45, 0x78, 0x63, 0x6c, - 0x75, 0x64, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x70, - 0x61, 0x74, 0x68, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x74, - 0x6f, 0x72, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x12, 0x2f, 0x0a, 0x05, 0x61, 0x75, 0x64, 0x69, - 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2e, - 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x41, 0x75, 0x64, 0x69, 0x74, 0x49, 0x6e, - 0x66, 0x6f, 0x52, 0x05, 0x61, 0x75, 0x64, 0x69, 0x74, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x20, 0x2e, 0x79, 0x64, 0x62, 0x63, - 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x0d, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x39, 0x0a, - 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0e, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, - 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x2e, 0x0a, 0x13, 0x70, 0x61, 0x72, 0x65, - 0x6e, 0x74, 0x5f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, - 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x4f, 0x70, 0x65, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x83, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, - 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, - 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x4f, 0x4e, 0x45, - 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x03, 0x12, 0x0e, 0x0a, - 0x0a, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x04, 0x12, 0x0c, 0x0a, - 0x08, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, 0x14, 0x0a, 0x10, 0x53, - 0x54, 0x41, 0x52, 0x54, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x49, 0x4e, 0x47, 0x10, - 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x3e, - 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, - 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, - 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, - 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x79, 0x64, 0x62, 0x63, 0x70, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1a, 0x79, + 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x72, 0x65, + 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xad, 0x06, 0x0a, 0x09, 0x4f, + 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x74, + 0x61, 0x69, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, + 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, + 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x23, 0x0a, 0x0d, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, + 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, + 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x10, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, + 0x74, 0x12, 0x35, 0x0a, 0x17, 0x79, 0x64, 0x62, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, + 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x14, 0x79, 0x64, 0x62, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x62, 0x61, 0x63, 0x6b, + 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x61, 0x63, + 0x6b, 0x75, 0x70, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, + 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x12, 0x35, 0x0a, 0x17, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x5f, 0x74, 0x6f, 0x5f, 0x65, 0x78, 0x63, 0x6c, + 0x75, 0x64, 0x65, 0x18, 0x09, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x54, 0x6f, 0x45, 0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x12, + 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, + 0x18, 0x0a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x50, + 0x61, 0x74, 0x68, 0x73, 0x12, 0x2f, 0x0a, 0x05, 0x61, 0x75, 0x64, 0x69, 0x74, 0x18, 0x0b, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x31, 0x2e, 0x41, 0x75, 0x64, 0x69, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, + 0x61, 0x75, 0x64, 0x69, 0x74, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, + 0x0c, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x20, 0x2e, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2e, 0x76, 0x31, + 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x64, 0x41, 0x74, 0x12, 0x2e, 0x0a, 0x13, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x6f, + 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x11, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x3e, 0x0a, 0x0c, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x18, 0x10, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x79, 0x64, 0x62, + 0x63, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, + 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x22, 0x83, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x16, 0x0a, 0x12, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, + 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, + 0x4e, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x4f, 0x4e, 0x45, 0x10, 0x02, 0x12, 0x09, + 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x41, 0x4e, + 0x43, 0x45, 0x4c, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x43, 0x41, 0x4e, + 0x43, 0x45, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, 0x14, 0x0a, 0x10, 0x53, 0x54, 0x41, 0x52, 0x54, + 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0b, 0x0a, + 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, + 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x70, 0x6b, 0x67, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x31, 0x3b, 0x79, 0x64, 0x62, 0x63, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -325,16 +339,18 @@ var file_ydbcp_v1alpha1_operation_proto_goTypes = []any{ (*Operation)(nil), // 1: ydbcp.v1alpha1.Operation (*AuditInfo)(nil), // 2: ydbcp.v1alpha1.AuditInfo (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp + (*RetryConfig)(nil), // 4: ydbcp.v1alpha1.RetryConfig } var file_ydbcp_v1alpha1_operation_proto_depIdxs = []int32{ 2, // 0: ydbcp.v1alpha1.Operation.audit:type_name -> ydbcp.v1alpha1.AuditInfo 0, // 1: ydbcp.v1alpha1.Operation.status:type_name -> ydbcp.v1alpha1.Operation.Status 3, // 2: ydbcp.v1alpha1.Operation.updated_at:type_name -> google.protobuf.Timestamp - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 4, // 3: ydbcp.v1alpha1.Operation.retry_config:type_name -> ydbcp.v1alpha1.RetryConfig + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_ydbcp_v1alpha1_operation_proto_init() } @@ -343,6 +359,7 @@ func file_ydbcp_v1alpha1_operation_proto_init() { return } file_ydbcp_v1alpha1_backup_proto_init() + file_ydbcp_v1alpha1_retry_proto_init() type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/pkg/proto/ydbcp/v1alpha1/operation.proto b/pkg/proto/ydbcp/v1alpha1/operation.proto index ac6e2942..569aa998 100644 --- a/pkg/proto/ydbcp/v1alpha1/operation.proto +++ b/pkg/proto/ydbcp/v1alpha1/operation.proto @@ -4,6 +4,7 @@ package ydbcp.v1alpha1; option go_package = "github.com/ydb-platform/ydbcp/pkg/proto/ydbcp/v1alpha1;ydbcp"; import "ydbcp/v1alpha1/backup.proto"; +import "ydbcp/v1alpha1/retry.proto"; import "google/protobuf/timestamp.proto"; message Operation { @@ -32,4 +33,5 @@ message Operation { string message = 13; google.protobuf.Timestamp updated_at = 14; string parent_operation_id = 15; + RetryConfig retry_config = 16; } diff --git a/pkg/proto/ydbcp/v1alpha1/retry.pb.go b/pkg/proto/ydbcp/v1alpha1/retry.pb.go new file mode 100644 index 00000000..f5b83b14 --- /dev/null +++ b/pkg/proto/ydbcp/v1alpha1/retry.pb.go @@ -0,0 +1,181 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.1 +// protoc v5.28.2 +// source: ydbcp/v1alpha1/retry.proto + +package ydbcp + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RetryConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // user can set number of retries or max_backoff interval. + // if one of them is reached, retries stop. + // + // Types that are assignable to Retries: + // + // *RetryConfig_Count + // *RetryConfig_MaxBackoff + Retries isRetryConfig_Retries `protobuf_oneof:"retries"` +} + +func (x *RetryConfig) Reset() { + *x = RetryConfig{} + mi := &file_ydbcp_v1alpha1_retry_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RetryConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RetryConfig) ProtoMessage() {} + +func (x *RetryConfig) ProtoReflect() protoreflect.Message { + mi := &file_ydbcp_v1alpha1_retry_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RetryConfig.ProtoReflect.Descriptor instead. +func (*RetryConfig) Descriptor() ([]byte, []int) { + return file_ydbcp_v1alpha1_retry_proto_rawDescGZIP(), []int{0} +} + +func (m *RetryConfig) GetRetries() isRetryConfig_Retries { + if m != nil { + return m.Retries + } + return nil +} + +func (x *RetryConfig) GetCount() uint32 { + if x, ok := x.GetRetries().(*RetryConfig_Count); ok { + return x.Count + } + return 0 +} + +func (x *RetryConfig) GetMaxBackoff() *durationpb.Duration { + if x, ok := x.GetRetries().(*RetryConfig_MaxBackoff); ok { + return x.MaxBackoff + } + return nil +} + +type isRetryConfig_Retries interface { + isRetryConfig_Retries() +} + +type RetryConfig_Count struct { + Count uint32 `protobuf:"varint,1,opt,name=count,proto3,oneof"` +} + +type RetryConfig_MaxBackoff struct { + MaxBackoff *durationpb.Duration `protobuf:"bytes,2,opt,name=max_backoff,json=maxBackoff,proto3,oneof"` +} + +func (*RetryConfig_Count) isRetryConfig_Retries() {} + +func (*RetryConfig_MaxBackoff) isRetryConfig_Retries() {} + +var File_ydbcp_v1alpha1_retry_proto protoreflect.FileDescriptor + +var file_ydbcp_v1alpha1_retry_proto_rawDesc = []byte{ + 0x0a, 0x1a, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, + 0x2f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x79, 0x64, + 0x62, 0x63, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x1a, 0x1e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6e, 0x0a, 0x0b, + 0x52, 0x65, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x05, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x05, 0x63, 0x6f, + 0x75, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x0b, 0x6d, 0x61, 0x78, 0x5f, 0x62, 0x61, 0x63, 0x6b, 0x6f, + 0x66, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0a, 0x6d, 0x61, 0x78, 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, + 0x66, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x42, 0x3e, 0x5a, 0x3c, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, + 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x70, 0x6b, + 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, + 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x79, 0x64, 0x62, 0x63, 0x70, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_ydbcp_v1alpha1_retry_proto_rawDescOnce sync.Once + file_ydbcp_v1alpha1_retry_proto_rawDescData = file_ydbcp_v1alpha1_retry_proto_rawDesc +) + +func file_ydbcp_v1alpha1_retry_proto_rawDescGZIP() []byte { + file_ydbcp_v1alpha1_retry_proto_rawDescOnce.Do(func() { + file_ydbcp_v1alpha1_retry_proto_rawDescData = protoimpl.X.CompressGZIP(file_ydbcp_v1alpha1_retry_proto_rawDescData) + }) + return file_ydbcp_v1alpha1_retry_proto_rawDescData +} + +var file_ydbcp_v1alpha1_retry_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_ydbcp_v1alpha1_retry_proto_goTypes = []any{ + (*RetryConfig)(nil), // 0: ydbcp.v1alpha1.RetryConfig + (*durationpb.Duration)(nil), // 1: google.protobuf.Duration +} +var file_ydbcp_v1alpha1_retry_proto_depIdxs = []int32{ + 1, // 0: ydbcp.v1alpha1.RetryConfig.max_backoff:type_name -> google.protobuf.Duration + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_ydbcp_v1alpha1_retry_proto_init() } +func file_ydbcp_v1alpha1_retry_proto_init() { + if File_ydbcp_v1alpha1_retry_proto != nil { + return + } + file_ydbcp_v1alpha1_retry_proto_msgTypes[0].OneofWrappers = []any{ + (*RetryConfig_Count)(nil), + (*RetryConfig_MaxBackoff)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_ydbcp_v1alpha1_retry_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_ydbcp_v1alpha1_retry_proto_goTypes, + DependencyIndexes: file_ydbcp_v1alpha1_retry_proto_depIdxs, + MessageInfos: file_ydbcp_v1alpha1_retry_proto_msgTypes, + }.Build() + File_ydbcp_v1alpha1_retry_proto = out.File + file_ydbcp_v1alpha1_retry_proto_rawDesc = nil + file_ydbcp_v1alpha1_retry_proto_goTypes = nil + file_ydbcp_v1alpha1_retry_proto_depIdxs = nil +} diff --git a/pkg/proto/ydbcp/v1alpha1/retry.proto b/pkg/proto/ydbcp/v1alpha1/retry.proto new file mode 100644 index 00000000..c5923346 --- /dev/null +++ b/pkg/proto/ydbcp/v1alpha1/retry.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package ydbcp.v1alpha1; +option go_package = "github.com/ydb-platform/ydbcp/pkg/proto/ydbcp/v1alpha1;ydbcp"; + +import "google/protobuf/duration.proto"; + +message RetryConfig { + //user can set number of retries or max_backoff interval. + //if one of them is reached, retries stop. + oneof retries { + uint32 count = 1; + google.protobuf.Duration max_backoff = 2; + } +}