Skip to content

Commit

Permalink
implement ListBackupSchedules. join schedules with backups for rpo info
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Sep 23, 2024
1 parent 1e9293a commit 6f65795
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ S3_ACCESS_KEY = "ydbcp"
S3_SECRET_KEY = "password"
S3_BUCKET = "test-bucket"
S3_REGION = "us-east-1"
YDB_NAME = "local-ydb"
YDB_NAME = "local-ydb"
12 changes: 12 additions & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ func (c *MockDBConnector) GetBackup(
return types.Backup{}, fmt.Errorf("backup not found, id %s", backupID)
}

func (c *MockDBConnector) GetSchedule(
_ context.Context, scheduleID string,
) (types.BackupSchedule, error) {
c.guard.Lock()
defer c.guard.Unlock()

if schedule, exist := c.backupSchedules[scheduleID]; exist {
return schedule, nil
}
return types.BackupSchedule{}, fmt.Errorf("backupSchedule not found, id %s", scheduleID)
}

func (c *MockDBConnector) SelectOperations(
_ context.Context, _ queries.ReadTableQuery,
) ([]types.Operation, error) {
Expand Down
17 changes: 14 additions & 3 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule,
recoveryPoint *time.Time
nextLaunch *time.Time
)
err := res.ScanNamed(

namedValues := []named.Value{
named.Required("id", &ID),
named.Required("container_id", &containerID),
named.Required("database", &databaseName),
Expand All @@ -283,11 +284,21 @@ func ReadBackupScheduleFromResultSet(res result.Result) (*types.BackupSchedule,
named.Optional("paths", &sourcePaths),
named.Optional("paths_to_exclude", &sourcePathsToExclude),
named.Optional("recovery_point_objective", &recoveryPointObjective),
named.Optional("next_launch", &nextLaunch),
named.Optional("last_backup_id", &lastBackupID),
named.Optional("last_successful_backup_id", &lastSuccessfulBackupID),
named.Optional("recovery_point", &recoveryPoint),
named.Optional("next_launch", &nextLaunch),
)
}

err := res.ScanNamed(namedValues...)

if err != nil && strings.Contains(err.Error(), "count of columns less then values") {
//this means the initial query was not a join but a simple select from BackupSchedules.
//unfortunately I did not find a great way to implement this behaviour with YDB Go SDK.
namedValues = namedValues[:len(namedValues)-3]
err = res.ScanNamed(namedValues...)
}

if err != nil {
return nil, err
}
Expand Down
40 changes: 25 additions & 15 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ var (
}
AllBackupScheduleFields = []string{
"id", "container_id", "database", "endpoint", "name", "active", "crontab", "ttl", "paths", "paths_to_exclude",
"initiated",
"created_at", "recovery_point_objective", "last_backup_id", "last_successful_backup_id", "recovery_point",
"next_launch",
"initiated", "created_at", "recovery_point_objective", "next_launch",
}
)

Expand All @@ -52,6 +50,7 @@ type ReadTableQuery interface {
}

type ReadTableQueryImpl struct {
rawQuery *string
tableName string
selectFields []string
filters [][]table_types.Value
Expand All @@ -73,6 +72,12 @@ func NewReadTableQuery(options ...ReadTableQueryOption) *ReadTableQueryImpl {
return d
}

func WithRawQuery(rawQuery string) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.rawQuery = &rawQuery
}
}

func WithTableName(tableName string) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.tableName = tableName
Expand Down Expand Up @@ -128,19 +133,24 @@ func (d *ReadTableQueryImpl) MakeFilterString() string {
}

func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
if len(d.selectFields) == 0 {
return nil, errors.New("no fields to select")
}
if len(d.tableName) == 0 {
return nil, errors.New("no table")
}
var res string
filter := d.MakeFilterString()
res := fmt.Sprintf(
"SELECT %s FROM %s%s",
strings.Join(d.selectFields, ", "),
d.tableName,
filter,
)
if d.rawQuery == nil {
if len(d.selectFields) == 0 {
return nil, errors.New("no fields to select")
}
if len(d.tableName) == 0 {
return nil, errors.New("no table")
}
res = fmt.Sprintf(
"SELECT %s FROM %s%s",
strings.Join(d.selectFields, ", "),
d.tableName,
filter,
)
} else {
res = fmt.Sprintf("%s%s", *d.rawQuery, filter)
}
xlog.Debug(ctx, "read query", zap.String("yql", res))
return &FormatQueryResult{
QueryText: res,
Expand Down
27 changes: 4 additions & 23 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,6 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
"$recovery_point_objective",
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
)
if schedule.LastBackupID != nil {
d.AddValueParam("$last_backup_id", table_types.StringValueFromString(*schedule.LastBackupID))
}
if schedule.LastSuccessfulBackupID != nil {
d.AddValueParam(
"$last_successful_backup_id", table_types.StringValueFromString(*schedule.LastSuccessfulBackupID),
)
}
if schedule.RecoveryPoint != nil {
d.AddValueParam("$recovery_point", table_types.TimestampValueFromTime(*schedule.RecoveryPoint))
}
if schedule.NextLaunch != nil {
d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch))
}
Expand All @@ -344,21 +333,13 @@ func BuildUpdateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
d.AddValueParam("$crontab", table_types.StringValueFromString(schedule.ScheduleSettings.SchedulePattern.Crontab))
d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(schedule.ScheduleSettings.Ttl.AsDuration()))

d.AddValueParam(
"$recovery_point_objective",
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
)
if schedule.LastBackupID != nil {
d.AddValueParam("$last_backup_id", table_types.StringValueFromString(*schedule.LastBackupID))
}
if schedule.LastSuccessfulBackupID != nil {
if schedule.ScheduleSettings.RecoveryPointObjective != nil {
d.AddValueParam(
"$last_successful_backup_id", table_types.StringValueFromString(*schedule.LastSuccessfulBackupID),
"$recovery_point_objective",
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
)
}
if schedule.RecoveryPoint != nil {
d.AddValueParam("$recovery_point", table_types.TimestampValueFromTime(*schedule.RecoveryPoint))
}

if schedule.NextLaunch != nil {
d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch))
}
Expand Down
3 changes: 1 addition & 2 deletions internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ UPSERT INTO Operations (id, type, status, message, initiated, created_at, contai

func TestQueryBuilder_CreateBackupSchedule(t *testing.T) {
const (
queryString = `UPSERT INTO BackupSchedules (id, container_id, database, endpoint, name, active, crontab, ttl, paths, initiated, created_at, recovery_point_objective, last_backup_id, next_launch) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $name_0, $active_0, $crontab_0, $ttl_0, $paths_0, $initiated_0, $created_at_0, $recovery_point_objective_0, $last_backup_id_0, $next_launch_0)`
queryString = `UPSERT INTO BackupSchedules (id, container_id, database, endpoint, name, active, crontab, ttl, paths, initiated, created_at, recovery_point_objective, next_launch) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $name_0, $active_0, $crontab_0, $ttl_0, $paths_0, $initiated_0, $created_at_0, $recovery_point_objective_0, $next_launch_0)`
)
scID := types.GenerateObjectID()
bID := types.GenerateObjectID()
Expand Down Expand Up @@ -303,7 +303,6 @@ func TestQueryBuilder_CreateBackupSchedule(t *testing.T) {
"$recovery_point_objective_0",
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
),
table.ValueParam("$last_backup_id_0", table_types.StringValueFromString(*schedule.LastBackupID)),
table.ValueParam("$next_launch_0", table_types.TimestampValueFromTime(*schedule.NextLaunch)),
)
)
Expand Down
4 changes: 0 additions & 4 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ CREATE TABLE BackupSchedules (

recovery_point_objective Interval,

last_backup_id String,
last_successful_backup_id String,
recovery_point Timestamp,

next_launch Timestamp,
PRIMARY KEY (id)
)
2 changes: 1 addition & 1 deletion internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func BackupScheduleHandler(
return nil
}
now := time.Now()
// do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron.
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) {
b, op, err := backup_operations.MakeBackup(
ctx, clientConn, s3, allowedEndpointDomains, allowInsecureEndpoint, &pb.MakeBackupRequest{
Expand All @@ -54,7 +55,6 @@ func BackupScheduleHandler(
if err != nil {
return err
}
schedule.LastBackupID = &op.BackupID
err = schedule.UpdateNextLaunch(now)
if err != nil {
return err
Expand Down
7 changes: 2 additions & 5 deletions internal/handlers/schedule_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func TestBackupScheduleHandler(t *testing.T) {
ScheduleSettings: &pb.BackupScheduleSettings{
SchedulePattern: &pb.BackupSchedulePattern{Crontab: "* * * * * *"},
},
NextLaunch: &now,
LastBackupID: nil,
LastSuccessfulBackupID: nil,
NextLaunch: &now,
}
opMap := make(map[string]types.Operation)
backupMap := make(map[string]types.Backup)
Expand Down Expand Up @@ -72,11 +70,10 @@ func TestBackupScheduleHandler(t *testing.T) {
assert.Equal(t, len(backups), 1)
assert.Equal(t, types.BackupStateRunning, backups[0].Status)

// check schedule
// check schedule next launch
schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{})
assert.Empty(t, err)
assert.NotEmpty(t, schedules)
assert.Equal(t, len(schedules), 1)
assert.Equal(t, *schedules[0].LastBackupID, backups[0].ID)
assert.Greater(t, *schedules[0].NextLaunch, now)
}
11 changes: 5 additions & 6 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package handlers
import (
"context"
"fmt"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
Expand Down Expand Up @@ -35,7 +34,7 @@ func TBOperationHandler(
client client.ClientConnector,
s3 s3.S3Connector,
config config.Config,
queryBulderFactory queries.WriteQueryBulderFactory,
queryBuilderFactory queries.WriteQueryBulderFactory,
) error {
xlog.Info(ctx, "TBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand Down Expand Up @@ -76,7 +75,7 @@ func TBOperationHandler(
backupToWrite.Message = operation.GetMessage()
backupToWrite.AuditInfo.CompletedAt = now
return db.ExecuteUpsert(
ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
}
if ydbOpResponse.opResponse == nil {
Expand Down Expand Up @@ -158,7 +157,7 @@ func TBOperationHandler(
backupToWrite.Message = operation.GetMessage()
backupToWrite.AuditInfo.CompletedAt = operation.GetAudit().CompletedAt
return db.ExecuteUpsert(
ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
}
case types.OperationStateCancelling:
Expand All @@ -172,7 +171,7 @@ func TBOperationHandler(
operation.GetAudit().CompletedAt = now
backupToWrite.Message = operation.GetMessage()
return db.ExecuteUpsert(
ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
}

Expand Down Expand Up @@ -223,6 +222,6 @@ func TBOperationHandler(
backupToWrite.AuditInfo.CompletedAt = now
operation.GetAudit().CompletedAt = now
return db.ExecuteUpsert(
ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backup_schedule

import (
"context"
"github.com/gorhill/cronexpr"
"time"

"ydbcp/internal/auth"
Expand All @@ -21,6 +22,16 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
ListSchedulesQuery = `$last_successful_backup_id = SELECT schedule_id, MAX(b.completed_at) as recovery_point, MAX_BY(b.id, b.completed_at) AS last_successful_backup_id from Backups as b WHERE b.status = 'AVAILABLE' GROUP BY schedule_id;
$last_backup_id = SELECT schedule_id as schedule_id_2, MAX_BY(b.id, b.completed_at) AS last_backup_id from Backups as b GROUP BY schedule_id;
select * from BackupSchedules as schedules
left join $last_successful_backup_id as b1 on schedules.id = b1.schedule_id
left join $last_backup_id as b2 on schedules.id = b2.schedule_id_2
`
)

type BackupScheduleService struct {
pb.UnimplementedBackupScheduleServiceServer
driver db.DBConnector
Expand All @@ -43,7 +54,13 @@ func (s *BackupScheduleService) CreateBackupSchedule(
ctx, "no backup schedule settings for CreateBackupSchedule", zap.String("request", request.String()),
)
return nil, status.Error(codes.FailedPrecondition, "no backup schedule settings for CreateBackupSchedule")

}
_, err = cronexpr.Parse(request.ScheduleSettings.SchedulePattern.Crontab)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, "failed to parse crontab")
}
if request.ScheduleSettings.RecoveryPointObjective != nil && (request.ScheduleSettings.RecoveryPointObjective.Seconds == 0) {
return nil, status.Error(codes.FailedPrecondition, "recovery point objective shoulde be greater than 0")
}
now := time.Now()
schedule := types.BackupSchedule{
Expand Down Expand Up @@ -170,8 +187,7 @@ func (s *BackupScheduleService) ListBackupSchedules(

schedules, err := s.driver.SelectBackupSchedules(
ctx, queries.NewReadTableQuery(
queries.WithTableName("BackupSchedules"),
queries.WithSelectFields(queries.AllBackupScheduleFields...),
queries.WithRawQuery(ListSchedulesQuery),
queries.WithQueryFilters(queryFilters...),
),
)
Expand Down
6 changes: 5 additions & 1 deletion internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (o *Backup) String() string {
}

func (o *Backup) Proto() *pb.Backup {
return &pb.Backup{
backup := &pb.Backup{
Id: o.ID,
ContainerId: o.ContainerID,
DatabaseName: o.DatabaseName,
Expand All @@ -68,6 +68,10 @@ func (o *Backup) Proto() *pb.Backup {
Message: o.Message,
ExpireAt: nil,
}
if o.ScheduleID != nil {
backup.ScheduleId = *o.ScheduleID
}
return backup
}

func (o *Backup) CanBeDeleted() bool {
Expand Down
12 changes: 7 additions & 5 deletions internal/types/backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ type BackupSchedule struct {
func (b *BackupSchedule) Proto() *pb.BackupSchedule {
var backupInfo *pb.ScheduledBackupInfo
if b.LastSuccessfulBackupID != nil {
rpoMargin := time.Since(*b.RecoveryPoint)
backupInfo = &pb.ScheduledBackupInfo{
BackupId: *b.LastSuccessfulBackupID,
RecoveryPoint: timestamppb.New(*b.RecoveryPoint),
LastBackupRpoMarginInterval: durationpb.New(rpoMargin),
LastBackupRpoMarginPercent: rpoMargin.Seconds() / float64(b.ScheduleSettings.RecoveryPointObjective.Seconds),
BackupId: *b.LastSuccessfulBackupID,
}
if b.ScheduleSettings.RecoveryPointObjective != nil && b.RecoveryPoint != nil {
rpoMargin := time.Since(*b.RecoveryPoint)
backupInfo.RecoveryPoint = timestamppb.New(*b.RecoveryPoint)
backupInfo.LastBackupRpoMarginInterval = durationpb.New(rpoMargin)
backupInfo.LastBackupRpoMarginPercent = rpoMargin.Seconds() / float64(b.ScheduleSettings.RecoveryPointObjective.Seconds)
}
}
var nextLaunchTs *timestamppb.Timestamp
Expand Down

0 comments on commit 6f65795

Please sign in to comment.