diff --git a/.env b/.env index 6e04931b..d4a2a4d1 100644 --- a/.env +++ b/.env @@ -3,4 +3,4 @@ S3_ACCESS_KEY = "ydbcp" S3_SECRET_KEY = "password" S3_BUCKET = "test-bucket" S3_REGION = "us-east-1" -YDB_NAME = "local-ydb" \ No newline at end of file +YDB_NAME = "local-ydb" diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index ddb31e6e..e1afd632 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -195,6 +195,18 @@ func (c *MockDBConnector) GetBackup( return types.Backup{}, fmt.Errorf("backup not found, id %s", backupID) } +func (c *MockDBConnector) GetSchedule( + _ context.Context, scheduleID string, +) (types.BackupSchedule, error) { + c.guard.Lock() + defer c.guard.Unlock() + + if schedule, exist := c.backupSchedules[scheduleID]; exist { + return schedule, nil + } + return types.BackupSchedule{}, fmt.Errorf("backupSchedule not found, id %s", scheduleID) +} + func (c *MockDBConnector) SelectOperations( _ context.Context, _ queries.ReadTableQuery, ) ([]types.Operation, error) { diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 6334122d..ec80fedc 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -268,7 +268,8 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule, recoveryPoint *time.Time nextLaunch *time.Time ) - err := res.ScanNamed( + + namedValues := []named.Value{ named.Required("id", &ID), named.Required("container_id", &containerID), named.Required("database", &databaseName), @@ -283,11 +284,21 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule, named.Optional("paths", &sourcePaths), named.Optional("paths_to_exclude", &sourcePathsToExclude), named.Optional("recovery_point_objective", &recoveryPointObjective), + named.Optional("next_launch", &nextLaunch), named.Optional("last_backup_id", &lastBackupID), named.Optional("last_successful_backup_id", &lastSuccessfulBackupID), named.Optional("recovery_point", &recoveryPoint), - named.Optional("next_launch", &nextLaunch), - ) + } + + err := res.ScanNamed(namedValues...) + + if err != nil && strings.Contains(err.Error(), "count of columns less then values") { + //this means the initial query was not a join but a simple select from BackupSchedules. + //unfortunately I did not find a great way to implement this behaviour with YDB Go SDK. + namedValues = namedValues[:len(namedValues)-3] + err = res.ScanNamed(namedValues...) + } + if err != nil { return nil, err } @@ -311,6 +322,8 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule, } if recoveryPointObjective != nil { rpoDuration = durationpb.New(*recoveryPointObjective) + } else { + rpoDuration = nil } return &types.BackupSchedule{ diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index 92264118..bd84d88f 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -29,9 +29,7 @@ var ( } AllBackupScheduleFields = []string{ "id", "container_id", "database", "endpoint", "name", "active", "crontab", "ttl", "paths", "paths_to_exclude", - "initiated", - "created_at", "recovery_point_objective", "last_backup_id", "last_successful_backup_id", "recovery_point", - "next_launch", + "initiated", "created_at", "recovery_point_objective", "next_launch", } ) @@ -52,6 +50,7 @@ type ReadTableQuery interface { } type ReadTableQueryImpl struct { + rawQuery *string tableName string selectFields []string filters [][]table_types.Value @@ -73,6 +72,12 @@ func NewReadTableQuery(options ...ReadTableQueryOption) *ReadTableQueryImpl { return d } +func WithRawQuery(rawQuery string) ReadTableQueryOption { + return func(d *ReadTableQueryImpl) { + d.rawQuery = &rawQuery + } +} + func WithTableName(tableName string) ReadTableQueryOption { return func(d *ReadTableQueryImpl) { d.tableName = tableName @@ -128,19 +133,24 @@ func (d *ReadTableQueryImpl) MakeFilterString() string { } func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) { - if len(d.selectFields) == 0 { - return nil, errors.New("no fields to select") - } - if len(d.tableName) == 0 { - return nil, errors.New("no table") - } + var res string filter := d.MakeFilterString() - res := fmt.Sprintf( - "SELECT %s FROM %s%s", - strings.Join(d.selectFields, ", "), - d.tableName, - filter, - ) + if d.rawQuery == nil { + if len(d.selectFields) == 0 { + return nil, errors.New("no fields to select") + } + if len(d.tableName) == 0 { + return nil, errors.New("no table") + } + res = fmt.Sprintf( + "SELECT %s FROM %s%s", + strings.Join(d.selectFields, ", "), + d.tableName, + filter, + ) + } else { + res = fmt.Sprintf("%s%s", *d.rawQuery, filter) + } xlog.Debug(ctx, "read query", zap.String("yql", res)) return &FormatQueryResult{ QueryText: res, diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 3fe601ea..29b9fd69 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -310,17 +310,6 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr "$recovery_point_objective", table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()), ) - if schedule.LastBackupID != nil { - d.AddValueParam("$last_backup_id", table_types.StringValueFromString(*schedule.LastBackupID)) - } - if schedule.LastSuccessfulBackupID != nil { - d.AddValueParam( - "$last_successful_backup_id", table_types.StringValueFromString(*schedule.LastSuccessfulBackupID), - ) - } - if schedule.RecoveryPoint != nil { - d.AddValueParam("$recovery_point", table_types.TimestampValueFromTime(*schedule.RecoveryPoint)) - } if schedule.NextLaunch != nil { d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch)) } @@ -344,21 +333,13 @@ func BuildUpdateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr d.AddValueParam("$crontab", table_types.StringValueFromString(schedule.ScheduleSettings.SchedulePattern.Crontab)) d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(schedule.ScheduleSettings.Ttl.AsDuration())) - d.AddValueParam( - "$recovery_point_objective", - table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()), - ) - if schedule.LastBackupID != nil { - d.AddValueParam("$last_backup_id", table_types.StringValueFromString(*schedule.LastBackupID)) - } - if schedule.LastSuccessfulBackupID != nil { + if schedule.ScheduleSettings.RecoveryPointObjective != nil { d.AddValueParam( - "$last_successful_backup_id", table_types.StringValueFromString(*schedule.LastSuccessfulBackupID), + "$recovery_point_objective", + table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()), ) } - if schedule.RecoveryPoint != nil { - d.AddValueParam("$recovery_point", table_types.TimestampValueFromTime(*schedule.RecoveryPoint)) - } + if schedule.NextLaunch != nil { d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch)) } diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index 71092efa..0cf8a60b 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -77,10 +77,6 @@ CREATE TABLE BackupSchedules ( recovery_point_objective Interval, - last_backup_id String, - last_successful_backup_id String, - recovery_point Timestamp, - next_launch Timestamp, PRIMARY KEY (id) ) diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index 81c9150c..ab502836 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -41,6 +41,7 @@ func BackupScheduleHandler( return nil } now := time.Now() + // do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron. if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) { b, op, err := backup_operations.MakeBackup( ctx, clientConn, s3, allowedEndpointDomains, allowInsecureEndpoint, &pb.MakeBackupRequest{ @@ -54,7 +55,6 @@ func BackupScheduleHandler( if err != nil { return err } - schedule.LastBackupID = &op.BackupID err = schedule.UpdateNextLaunch(now) if err != nil { return err diff --git a/internal/handlers/schedule_backup_test.go b/internal/handlers/schedule_backup_test.go index e88bd3ab..b7ec4001 100644 --- a/internal/handlers/schedule_backup_test.go +++ b/internal/handlers/schedule_backup_test.go @@ -26,9 +26,7 @@ func TestBackupScheduleHandler(t *testing.T) { ScheduleSettings: &pb.BackupScheduleSettings{ SchedulePattern: &pb.BackupSchedulePattern{Crontab: "* * * * * *"}, }, - NextLaunch: &now, - LastBackupID: nil, - LastSuccessfulBackupID: nil, + NextLaunch: &now, } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) @@ -72,11 +70,10 @@ func TestBackupScheduleHandler(t *testing.T) { assert.Equal(t, len(backups), 1) assert.Equal(t, types.BackupStateRunning, backups[0].Status) - // check schedule + // check schedule next launch schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) assert.NotEmpty(t, schedules) assert.Equal(t, len(schedules), 1) - assert.Equal(t, *schedules[0].LastBackupID, backups[0].ID) assert.Greater(t, *schedules[0].NextLaunch, now) } diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index af597a68..f122615c 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -3,7 +3,6 @@ package handlers import ( "context" "fmt" - "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -35,7 +34,7 @@ func TBOperationHandler( client client.ClientConnector, s3 s3.S3Connector, config config.Config, - queryBulderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBulderFactory, ) error { xlog.Info(ctx, "TBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -76,7 +75,7 @@ func TBOperationHandler( backupToWrite.Message = operation.GetMessage() backupToWrite.AuditInfo.CompletedAt = now return db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } if ydbOpResponse.opResponse == nil { @@ -158,7 +157,7 @@ func TBOperationHandler( backupToWrite.Message = operation.GetMessage() backupToWrite.AuditInfo.CompletedAt = operation.GetAudit().CompletedAt return db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } case types.OperationStateCancelling: @@ -172,7 +171,7 @@ func TBOperationHandler( operation.GetAudit().CompletedAt = now backupToWrite.Message = operation.GetMessage() return db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } @@ -223,6 +222,6 @@ func TBOperationHandler( backupToWrite.AuditInfo.CompletedAt = now operation.GetAudit().CompletedAt = now return db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index db01ceff..218b2ba8 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -2,6 +2,7 @@ package backup_schedule import ( "context" + "github.com/gorhill/cronexpr" "time" "ydbcp/internal/auth" @@ -21,6 +22,16 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +const ( + ListSchedulesQuery = `$last_successful_backup_id = SELECT schedule_id, MAX(b.completed_at) as recovery_point, MAX_BY(b.id, b.completed_at) AS last_successful_backup_id from Backups as b WHERE b.status = 'AVAILABLE' GROUP BY schedule_id; +$last_backup_id = SELECT schedule_id as schedule_id_2, MAX_BY(b.id, b.completed_at) AS last_backup_id from Backups as b GROUP BY schedule_id; + +select * from BackupSchedules as schedules +left join $last_successful_backup_id as b1 on schedules.id = b1.schedule_id +left join $last_backup_id as b2 on schedules.id = b2.schedule_id_2 +` +) + type BackupScheduleService struct { pb.UnimplementedBackupScheduleServiceServer driver db.DBConnector @@ -43,7 +54,13 @@ func (s *BackupScheduleService) CreateBackupSchedule( ctx, "no backup schedule settings for CreateBackupSchedule", zap.String("request", request.String()), ) return nil, status.Error(codes.FailedPrecondition, "no backup schedule settings for CreateBackupSchedule") - + } + _, err = cronexpr.Parse(request.ScheduleSettings.SchedulePattern.Crontab) + if err != nil { + return nil, status.Error(codes.FailedPrecondition, "failed to parse crontab") + } + if request.ScheduleSettings.RecoveryPointObjective != nil && (request.ScheduleSettings.RecoveryPointObjective.Seconds == 0) { + return nil, status.Error(codes.FailedPrecondition, "recovery point objective shoulde be greater than 0") } now := time.Now() schedule := types.BackupSchedule{ @@ -170,8 +187,7 @@ func (s *BackupScheduleService) ListBackupSchedules( schedules, err := s.driver.SelectBackupSchedules( ctx, queries.NewReadTableQuery( - queries.WithTableName("BackupSchedules"), - queries.WithSelectFields(queries.AllBackupScheduleFields...), + queries.WithRawQuery(ListSchedulesQuery), queries.WithQueryFilters(queryFilters...), ), ) diff --git a/internal/types/backup.go b/internal/types/backup.go index 143c2363..ad10503e 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -51,7 +51,7 @@ func (o *Backup) String() string { } func (o *Backup) Proto() *pb.Backup { - return &pb.Backup{ + backup := &pb.Backup{ Id: o.ID, ContainerId: o.ContainerID, DatabaseName: o.DatabaseName, @@ -68,6 +68,10 @@ func (o *Backup) Proto() *pb.Backup { Message: o.Message, ExpireAt: nil, } + if o.ScheduleID != nil { + backup.ScheduleId = *o.ScheduleID + } + return backup } func (o *Backup) CanBeDeleted() bool { diff --git a/internal/types/backup_schedule.go b/internal/types/backup_schedule.go index 5522b8a5..016d2cad 100644 --- a/internal/types/backup_schedule.go +++ b/internal/types/backup_schedule.go @@ -32,12 +32,14 @@ type BackupSchedule struct { func (b *BackupSchedule) Proto() *pb.BackupSchedule { var backupInfo *pb.ScheduledBackupInfo if b.LastSuccessfulBackupID != nil { - rpoMargin := time.Since(*b.RecoveryPoint) backupInfo = &pb.ScheduledBackupInfo{ - BackupId: *b.LastSuccessfulBackupID, - RecoveryPoint: timestamppb.New(*b.RecoveryPoint), - LastBackupRpoMarginInterval: durationpb.New(rpoMargin), - LastBackupRpoMarginPercent: rpoMargin.Seconds() / float64(b.ScheduleSettings.RecoveryPointObjective.Seconds), + BackupId: *b.LastSuccessfulBackupID, + } + if b.ScheduleSettings.RecoveryPointObjective != nil && b.RecoveryPoint != nil { + rpoMargin := time.Since(*b.RecoveryPoint) + backupInfo.RecoveryPoint = timestamppb.New(*b.RecoveryPoint) + backupInfo.LastBackupRpoMarginInterval = durationpb.New(rpoMargin) + backupInfo.LastBackupRpoMarginPercent = rpoMargin.Seconds() / float64(b.ScheduleSettings.RecoveryPointObjective.Seconds) } } var nextLaunchTs *timestamppb.Timestamp