Skip to content

Commit

Permalink
reimplement GetBackupSchedule. Supply return value with RPO info.
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Sep 26, 2024
1 parent fe60c5d commit c5488c6
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 25 deletions.
56 changes: 56 additions & 0 deletions cmd/integration/list_schedules/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,60 @@ func main() {
}
}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "1"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM {
log.Panicf(
"Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "2"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM {
log.Panicf(
"Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)

}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "3"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
info := &pb.ScheduledBackupInfo{
BackupId: "6",
RecoveryPoint: timestamppb.New(fourPM),
}
if !proto.Equal(info, s.LastSuccessfulBackupInfo) {
log.Panicf(
"Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "4"})
if err != nil {
log.Panicf("failed to get backup schedule: %v", err)
}
if s.LastSuccessfulBackupInfo != nil {
log.Panicf(
"Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}

}
21 changes: 19 additions & 2 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type DBConnector interface {
SelectOperations(ctx context.Context, queryBuilder queries.ReadTableQuery) (
[]types.Operation, error,
)
SelectBackupSchedules(ctx context.Context, queryBuilder queries.ReadTableQuery) (

SelectBackupSchedules(ctx context.Context, queryBuilder queries.ReadTableQuery) ([]*types.BackupSchedule, error)
SelectBackupSchedulesWithRPOInfo(ctx context.Context, queryBuilder queries.ReadTableQuery) (
[]*types.BackupSchedule, error,
)
ActiveOperations(context.Context) ([]types.Operation, error)
Expand Down Expand Up @@ -257,7 +259,22 @@ func (d *YdbConnector) SelectBackupSchedules(
ctx,
d,
queryBuilder,
ReadBackupScheduleFromResultSet,
func(res result.Result) (*types.BackupSchedule, error) {
return ReadBackupScheduleFromResultSet(res, false)
},
)
}

func (d *YdbConnector) SelectBackupSchedulesWithRPOInfo(
ctx context.Context, queryBuilder queries.ReadTableQuery,
) ([]*types.BackupSchedule, error) {
return DoStructSelect[types.BackupSchedule](
ctx,
d,
queryBuilder,
func(res result.Result) (*types.BackupSchedule, error) {
return ReadBackupScheduleFromResultSet(res, true)
},
)
}

Expand Down
19 changes: 19 additions & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ func (c *MockDBConnector) SelectBackupSchedules(
return schedules, nil
}

func (c *MockDBConnector) SelectBackupSchedulesWithRPOInfo(
_ context.Context, _ queries.ReadTableQuery,
) ([]*types.BackupSchedule, error) {
panic("not implemented")
}

func (c *MockDBConnector) SelectBackupsByStatus(
_ context.Context, _ string,
) ([]*types.Backup, error) {
c.guard.Lock()
defer c.guard.Unlock()

backups := make([]*types.Backup, 0, len(c.backups))
for _, backup := range c.backups {
backups = append(backups, &backup)
}
return backups, nil
}

func (c *MockDBConnector) UpdateBackup(
_ context.Context, id string, backupStatus string,
) error {
Expand Down
18 changes: 6 additions & 12 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
return &types.GenericOperation{ID: operationId}, nil
}

func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule, error) {
func ReadBackupScheduleFromResultSet(res result.Result, withRPOInfo bool) (*types.BackupSchedule, error) {
var (
ID string
containerID string
Expand Down Expand Up @@ -287,21 +287,15 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule,
named.Optional("paths", &sourcePaths),
named.Optional("paths_to_exclude", &sourcePathsToExclude),
named.Optional("recovery_point_objective", &recoveryPointObjective),
named.Optional("next_launch", &nextLaunch),
named.Optional("last_backup_id", &lastBackupID),
named.Optional("last_successful_backup_id", &lastSuccessfulBackupID),
named.Optional("recovery_point", &recoveryPoint),
}
if withRPOInfo {
namedValues = append(namedValues, named.Optional("last_backup_id", &lastBackupID))
namedValues = append(namedValues, named.Optional("last_successful_backup_id", &lastSuccessfulBackupID))
namedValues = append(namedValues, named.Optional("recovery_point", &recoveryPoint))
}

err := res.ScanNamed(namedValues...)

if err != nil && strings.Contains(err.Error(), "count of columns less then values") {
//this means the initial query was not a join but a simple select from BackupSchedules.
//unfortunately I did not find a great way to implement this behaviour with YDB Go SDK.
namedValues = namedValues[:len(namedValues)-3]
err = res.ScanNamed(namedValues...)
}

if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ func WithRawQuery(rawQuery string) ReadTableQueryOption {
}
}

func WithParameters(params ...table.ParameterOption) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
for _, param := range params {
d.tableQueryParams = append(
d.tableQueryParams, param,
)
}
}
}

func WithTableName(tableName string) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.tableName = tableName
Expand Down
30 changes: 19 additions & 11 deletions internal/server/services/backup_schedule/backup_schedule_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/gorhill/cronexpr"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"time"

"ydbcp/internal/auth"
Expand Down Expand Up @@ -31,6 +32,18 @@ $last_backup_id = SELECT schedule_id AS schedule_id_2, MAX_BY(b.id, b.completed_
SELECT * FROM BackupSchedules AS schedules
LEFT JOIN $last_successful_backup_id AS b1 ON schedules.id = b1.schedule_id
LEFT JOIN $last_backup_id AS b2 ON schedules.id = b2.schedule_id_2
`, types.BackupStateAvailable,
)
GetScheduleQuery = fmt.Sprintf(
`$rpo_info = SELECT
<|
recovery_point: MAX(b.completed_at),
last_successful_backup_id: MAX_BY(b.id, b.completed_at)
|> FROM Backups AS b WHERE b.status = '%s' AND b.schedule_id = $schedule_id;
$last_backup_id = SELECT MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b WHERE b.schedule_id = $schedule_id;
SELECT s.*, $last_backup_id AS last_backup_id, $rpo_info.recovery_point AS recovery_point, $rpo_info.last_successful_backup_id AS last_successful_backup_id FROM BackupSchedules AS s WHERE s.id = $schedule_id
`, types.BackupStateAvailable,
)
)
Expand Down Expand Up @@ -115,19 +128,15 @@ func (s *BackupScheduleService) GetBackupSchedule(

xlog.Debug(ctx, "GetBackupSchedule", zap.Stringer("request", request))

schedules, err := s.driver.SelectBackupSchedules(
schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo(
ctx, queries.NewReadTableQuery(
queries.WithTableName("BackupSchedules"),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{
table_types.StringValueFromString(scheduleID),
},
},
queries.WithRawQuery(GetScheduleQuery),
queries.WithParameters(
table.ValueParam("$schedule_id", table_types.StringValueFromString(scheduleID)),
),
),
)

if err != nil {
xlog.Error(ctx, "error getting backup schedule", zap.Error(err))
return nil, status.Error(codes.Internal, "error getting backup schedule")
Expand Down Expand Up @@ -186,8 +195,7 @@ func (s *BackupScheduleService) ListBackupSchedules(
},
)
}

schedules, err := s.driver.SelectBackupSchedules(
schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo(
ctx, queries.NewReadTableQuery(
queries.WithRawQuery(ListSchedulesQuery),
queries.WithQueryFilters(queryFilters...),
Expand Down

0 comments on commit c5488c6

Please sign in to comment.