Skip to content

Commit

Permalink
reimplement GetBackupSchedule. Supply return value with RPO info.
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Sep 26, 2024
1 parent fe60c5d commit c4774d9
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 19 deletions.
56 changes: 56 additions & 0 deletions cmd/integration/list_schedules/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,60 @@ func main() {
}
}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "1"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM {
log.Panicf(
"Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "2"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM {
log.Panicf(
"Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)

}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "3"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
info := &pb.ScheduledBackupInfo{
BackupId: "6",
RecoveryPoint: timestamppb.New(fourPM),
}
if !proto.Equal(info, s.LastSuccessfulBackupInfo) {
log.Panicf(
"Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "4"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
if s.LastSuccessfulBackupInfo != nil {
log.Panicf(
"Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}

}
15 changes: 14 additions & 1 deletion internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type DBConnector interface {
SelectOperations(ctx context.Context, queryBuilder queries.ReadTableQuery) (
[]types.Operation, error,
)
SelectBackupSchedules(ctx context.Context, queryBuilder queries.ReadTableQuery) (

SelectBackupSchedules(ctx context.Context, queryBuilder queries.ReadTableQuery) ([]*types.BackupSchedule, error)
SelectBackupSchedulesWithRPOInfo(ctx context.Context, queryBuilder queries.ReadTableQuery) (
[]*types.BackupSchedule, error,
)
ActiveOperations(context.Context) ([]types.Operation, error)
Expand Down Expand Up @@ -261,6 +263,17 @@ func (d *YdbConnector) SelectBackupSchedules(
)
}

func (d *YdbConnector) SelectBackupSchedulesWithRPOInfo(
ctx context.Context, queryBuilder queries.ReadTableQuery,
) ([]*types.BackupSchedule, error) {
return DoStructSelect[types.BackupSchedule](
ctx,
d,
queryBuilder,
ReadBackupScheduleWithRPOInfoFromResultSet,
)
}

func (d *YdbConnector) ActiveOperations(ctx context.Context) (
[]types.Operation, error,
) {
Expand Down
19 changes: 19 additions & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ func (c *MockDBConnector) SelectBackupSchedules(
return schedules, nil
}

func (c *MockDBConnector) SelectBackupSchedulesWithRPOInfo(
_ context.Context, _ queries.ReadTableQuery,
) ([]*types.BackupSchedule, error) {
panic("not implemented")
}

func (c *MockDBConnector) SelectBackupsByStatus(
_ context.Context, _ string,
) ([]*types.Backup, error) {
c.guard.Lock()
defer c.guard.Unlock()

backups := make([]*types.Backup, 0, len(c.backups))
for _, backup := range c.backups {
backups = append(backups, &backup)
}
return backups, nil
}

func (c *MockDBConnector) UpdateBackup(
_ context.Context, id string, backupStatus string,
) error {
Expand Down
97 changes: 90 additions & 7 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,96 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule,
nextLaunch *time.Time
)

namedValues := []named.Value{
named.Required("id", &ID),
named.Required("container_id", &containerID),
named.Required("database", &databaseName),
named.Required("endpoint", &databaseEndpoint),
named.Required("active", &active),
named.Required("crontab", &crontab),

named.Optional("initiated", &initiated),
named.Optional("created_at", &createdAt),
named.Optional("name", &name),
named.Optional("ttl", &ttl),
named.Optional("paths", &sourcePaths),
named.Optional("paths_to_exclude", &sourcePathsToExclude),
named.Optional("recovery_point_objective", &recoveryPointObjective),
named.Optional("next_launch", &nextLaunch),
}

err := res.ScanNamed(namedValues...)

if err != nil {
return nil, err
}

sourcePathsSlice := make([]string, 0)
sourcePathsToExcludeSlice := make([]string, 0)
if sourcePaths != nil {
sourcePathsSlice = strings.Split(*sourcePaths, ",")
}
if sourcePathsToExclude != nil {
sourcePathsToExcludeSlice = strings.Split(*sourcePathsToExclude, ",")
}

var ttlDuration *durationpb.Duration
var rpoDuration *durationpb.Duration

ttlDuration = nil
rpoDuration = nil
if ttl != nil {
ttlDuration = durationpb.New(*ttl)
}
if recoveryPointObjective != nil {
rpoDuration = durationpb.New(*recoveryPointObjective)
}

return &types.BackupSchedule{
ID: ID,
ContainerID: containerID,
DatabaseName: databaseName,
DatabaseEndpoint: databaseEndpoint,
SourcePaths: sourcePathsSlice,
SourcePathsToExclude: sourcePathsToExcludeSlice,
Audit: auditFromDb(initiated, createdAt, nil),
Name: StringOrEmpty(name),
Active: active,
ScheduleSettings: &pb.BackupScheduleSettings{
SchedulePattern: &pb.BackupSchedulePattern{Crontab: crontab},
Ttl: ttlDuration,
RecoveryPointObjective: rpoDuration,
},
NextLaunch: nextLaunch,
LastBackupID: lastBackupID,
LastSuccessfulBackupID: lastSuccessfulBackupID,
RecoveryPoint: recoveryPoint,
}, nil
}

func ReadBackupScheduleWithRPOInfoFromResultSet(res result.Result) (*types.BackupSchedule, error) {
var (
ID string
containerID string
databaseName string
databaseEndpoint string
active bool

crontab string

initiated *string
createdAt *time.Time
name *string
ttl *time.Duration
sourcePaths *string
sourcePathsToExclude *string
recoveryPointObjective *time.Duration
lastBackupID *string
lastSuccessfulBackupID *string
recoveryPoint *time.Time
nextLaunch *time.Time
)

namedValues := []named.Value{
named.Required("id", &ID),
named.Required("container_id", &containerID),
Expand All @@ -295,13 +385,6 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule,

err := res.ScanNamed(namedValues...)

if err != nil && strings.Contains(err.Error(), "count of columns less then values") {
//this means the initial query was not a join but a simple select from BackupSchedules.
//unfortunately I did not find a great way to implement this behaviour with YDB Go SDK.
namedValues = namedValues[:len(namedValues)-3]
err = res.ScanNamed(namedValues...)
}

if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ func WithRawQuery(rawQuery string) ReadTableQueryOption {
}
}

func WithParameters(params ...table.ParameterOption) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
for _, param := range params {
d.tableQueryParams = append(
d.tableQueryParams, param,
)
}
}
}

func WithTableName(tableName string) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.tableName = tableName
Expand Down
30 changes: 19 additions & 11 deletions internal/server/services/backup_schedule/backup_schedule_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/gorhill/cronexpr"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"time"

"ydbcp/internal/auth"
Expand Down Expand Up @@ -31,6 +32,18 @@ $last_backup_id = SELECT schedule_id AS schedule_id_2, MAX_BY(b.id, b.completed_
SELECT * FROM BackupSchedules AS schedules
LEFT JOIN $last_successful_backup_id AS b1 ON schedules.id = b1.schedule_id
LEFT JOIN $last_backup_id AS b2 ON schedules.id = b2.schedule_id_2
`, types.BackupStateAvailable,
)
GetScheduleQuery = fmt.Sprintf(
`$rpo_info = SELECT
<|
recovery_point: MAX(b.completed_at),
last_successful_backup_id: MAX_BY(b.id, b.completed_at)
|> FROM Backups AS b WHERE b.status = '%s' AND b.schedule_id = $schedule_id;
$last_backup_id = SELECT MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b WHERE b.schedule_id = $schedule_id;
SELECT s.*, $last_backup_id AS last_backup_id, $rpo_info.recovery_point AS recovery_point, $rpo_info.last_successful_backup_id AS last_successful_backup_id FROM BackupSchedules AS s WHERE s.id = $schedule_id
`, types.BackupStateAvailable,
)
)
Expand Down Expand Up @@ -115,19 +128,15 @@ func (s *BackupScheduleService) GetBackupSchedule(

xlog.Debug(ctx, "GetBackupSchedule", zap.Stringer("request", request))

schedules, err := s.driver.SelectBackupSchedules(
schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo(
ctx, queries.NewReadTableQuery(
queries.WithTableName("BackupSchedules"),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{
table_types.StringValueFromString(scheduleID),
},
},
queries.WithRawQuery(GetScheduleQuery),
queries.WithParameters(
table.ValueParam("$schedule_id", table_types.StringValueFromString(scheduleID)),
),
),
)

if err != nil {
xlog.Error(ctx, "error getting backup schedule", zap.Error(err))
return nil, status.Error(codes.Internal, "error getting backup schedule")
Expand Down Expand Up @@ -186,8 +195,7 @@ func (s *BackupScheduleService) ListBackupSchedules(
},
)
}

schedules, err := s.driver.SelectBackupSchedules(
schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo(
ctx, queries.NewReadTableQuery(
queries.WithRawQuery(ListSchedulesQuery),
queries.WithQueryFilters(queryFilters...),
Expand Down

0 comments on commit c4774d9

Please sign in to comment.