Skip to content

Commit

Permalink
add ScheduleBackup handler
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Sep 17, 2024
1 parent 02af8fd commit 08cc852
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ toolchain go1.22.1

require (
github.com/aws/aws-sdk-go v1.55.5
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/jonboulle/clockwork v0.3.0
github.com/stretchr/testify v1.8.1
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240821162910-6cb364b2ccc8
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand All @@ -54,6 +52,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down Expand Up @@ -82,8 +82,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 h1:nL8XwD6fSst7xFUirkaWJmE7kM0CdWRYgu6+YQer1d4=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240821162910-6cb364b2ccc8 h1:ZWxYw6L51aNAMLbTpC/VbXP0rcnvsCAJqx7EI/CjWmc=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240821162910-6cb364b2ccc8/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk/v3 v3.75.2 h1:thrbvktqKA6LFZTnZrGuQi8LQVel1J2dDfoQFsgvcYs=
Expand Down
12 changes: 11 additions & 1 deletion internal/auth/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Option func(*MockAuthProvider)

func WithToken(token string, subject string, authCode auth.AuthCode) Option {
return func(p *MockAuthProvider) {
if p.tokens == nil {
p.tokens = make(map[string]*MockTokenInfo)
}
p.tokens[token] = &MockTokenInfo{
subject: MockSubjectID(subject),
authCode: authCode,
Expand All @@ -83,12 +86,18 @@ func WithToken(token string, subject string, authCode auth.AuthCode) Option {

func WithContainer(id MockContainerID, container *MockContainer) Option {
return func(p *MockAuthProvider) {
if p.containers == nil {
p.containers = make(map[MockContainerID]*MockContainer)
}
p.containers[id] = container
}
}

func WithResource(id MockResourceID, res *MockResource) Option {
return func(p *MockAuthProvider) {
if p.resources == nil {
p.resources = make(map[MockResourceID]*MockResource)
}
p.resources[id] = res
}
}
Expand Down Expand Up @@ -178,7 +187,8 @@ func (p *MockAuthProvider) Authorize(
for _, c := range checks {
results = append(results, p.checkSubjectPermission(subject, c))
}
xlog.Info(ctx, "MockAuthProvider Authorize result",
xlog.Info(
ctx, "MockAuthProvider Authorize result",
zap.String("AuthResults", fmt.Sprintf("%v", results)),
zap.String("SubjectID", anonymousSubject),
)
Expand Down
7 changes: 7 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type S3Config struct {
AccessKeyIDPath string `yaml:"access_key_id_path"`
SecretAccessKeyPath string `yaml:"secret_access_key_path"`
S3ForcePathStyle bool `yaml:"s3_force_path_style"`
IsMock bool
}

type YDBConnectionConfig struct {
Expand Down Expand Up @@ -112,10 +113,16 @@ func readSecret(filename string) (string, error) {
}

func (c *S3Config) AccessKey() (string, error) {
if c.IsMock {
return "", nil
}
return readSecret(c.AccessKeyIDPath)
}

func (c *S3Config) SecretKey() (string, error) {
if c.IsMock {
return "", nil
}
return readSecret(c.SecretAccessKeyPath)

}
Expand Down
13 changes: 9 additions & 4 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package db

import (
"context"
"errors"
"fmt"
"sync"

Expand All @@ -23,8 +22,9 @@ type Option func(*MockDBConnector)

func NewMockDBConnector(options ...Option) *MockDBConnector {
connector := &MockDBConnector{
operations: make(map[string]types.Operation),
backups: make(map[string]types.Backup),
operations: make(map[string]types.Operation),
backups: make(map[string]types.Backup),
backupSchedules: make(map[string]types.BackupSchedule),
}
for _, opt := range options {
opt(connector)
Expand Down Expand Up @@ -198,7 +198,11 @@ func (c *MockDBConnector) GetBackup(
func (c *MockDBConnector) SelectOperations(
_ context.Context, _ queries.ReadTableQuery,
) ([]types.Operation, error) {
return nil, errors.New("do not call this method")
res := make([]types.Operation, 0, len(c.operations))
for _, v := range c.operations {
res = append(res, v.Copy())
}
return res, nil
}

func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.WriteTableQuery) error {
Expand All @@ -208,5 +212,6 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation
c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule
return nil
}
3 changes: 3 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
status *string
message *string
size *int64
scheduleId *string

creator *string
completedAt *time.Time
Expand All @@ -91,6 +92,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("status", &status),
named.Optional("message", &message),
named.Optional("size", &size),
named.Optional("schedule_id", &scheduleId),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -113,6 +115,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
Message: StringOrEmpty(message),
AuditInfo: auditFromDb(creator, createdAt, completedAt),
Size: Int64OrZero(size),
ScheduleID: scheduleId,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
"initiated", "created_at", "completed_at",
"s3_endpoint", "s3_region", "s3_bucket",
"s3_path_prefix", "status", "paths", "message",
"size",
"size", "schedule_id",
}
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
Expand Down
3 changes: 3 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$status", table_types.StringValueFromString(b.Status))
d.AddValueParam("$message", table_types.StringValueFromString(b.Message))
d.AddValueParam("$size", table_types.Int64Value(b.Size))
if b.ScheduleID != nil {
d.AddValueParam("$schedule_id", table_types.StringValueFromString(*b.ScheduleID))
}
if b.AuditInfo != nil {
d.AddValueParam("$initiated", table_types.StringValueFromString(b.AuditInfo.Creator))
if b.AuditInfo.CreatedAt != nil {
Expand Down
53 changes: 53 additions & 0 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package handlers

import (
"context"
"time"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/server/services/backup"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

func NewBackupScheduleHandler(
service *backup.BackupService,
queryBuilderFactory queries.WriteQueryBulderFactory,
) types.BackupScheduleHandler {
return func(ctx context.Context, schedule types.BackupSchedule) error {
return BackupScheduleHandler(ctx, schedule, service, queryBuilderFactory)
}
}

func BackupScheduleHandler(
ctx context.Context, schedule types.BackupSchedule, s *backup.BackupService,
queryBuilderFactory queries.WriteQueryBulderFactory,
) error {
if !schedule.Active { //maybe just select active schedules; will be done in processor
return nil
}
now := time.Now()
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) {
b, op, err := s.MakeBackupImpl(
ctx, &pb.MakeBackupRequest{
ContainerId: schedule.ContainerID,
DatabaseName: schedule.DatabaseName,
DatabaseEndpoint: schedule.DatabaseEndpoint,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
}, &schedule.ID,
)
if err != nil {
return err
}
schedule.LastBackupID = &op.BackupID
err = schedule.UpdateNextLaunch(now)
if err != nil {
return err
}
return s.Driver.ExecuteUpsert(
ctx,
queryBuilderFactory().WithCreateBackup(*b).WithCreateOperation(op).WithUpdateBackupSchedule(schedule),
)
}
return nil
}
94 changes: 94 additions & 0 deletions internal/handlers/schedule_backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package handlers

import (
"context"
"github.com/stretchr/testify/assert"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"testing"
"time"
"ydbcp/internal/auth"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/server/services/backup"
"ydbcp/internal/types"
auth2 "ydbcp/pkg/plugins/auth"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

func TestBackupScheduleHandler(t *testing.T) {
ctx := context.Background()
now := time.Now()
schedule := types.BackupSchedule{
ID: "12345",
ContainerID: "abcde",
Active: true,
DatabaseName: "mydb",
DatabaseEndpoint: "mydb.valid.com",
ScheduleSettings: &pb.BackupScheduleSettings{
SchedulePattern: &pb.BackupSchedulePattern{Crontab: "* * * * * *"},
},
NextLaunch: &now,
LastBackupID: nil,
LastSuccessfulBackupID: nil,
}
opMap := make(map[string]types.Operation)
backupMap := make(map[string]types.Backup)
ydbOpMap := make(map[string]*Ydb_Operations.Operation)
scheduleMap := make(map[string]types.BackupSchedule)
scheduleMap[schedule.ID] = schedule
dbConnector := db.NewMockDBConnector(
db.WithBackups(backupMap),
db.WithOperations(opMap),
db.WithBackupSchedules(scheduleMap),
)
clientConnector := client.NewMockClientConnector(
client.WithOperations(ydbOpMap),
)

mockAuth := auth.NewMockAuthProvider(
auth.WithToken("", "abcde", auth2.AuthCodeSuccess),
auth.WithContainer(
"abcde",
&auth.MockContainer{Permissions: map[auth.MockSubjectID]auth.MockPermissionsList{"abcde": map[string]bool{"ydb.databases.backup": true}}},
),
)
backupService := backup.NewBackupService(
dbConnector,
clientConnector,
config.S3Config{
S3ForcePathStyle: false,
IsMock: true,
},
mockAuth,
[]string{".valid.com"},
true,
)

handler := NewBackupScheduleHandler(backupService, queries.NewWriteTableQueryMock)
err := handler(ctx, schedule)
assert.Empty(t, err)

// check operation status (should be pending)
ops, err := dbConnector.SelectOperations(ctx, &queries.ReadTableQueryImpl{})
assert.Empty(t, err)
assert.NotEmpty(t, ops)
assert.Equal(t, len(ops), 1)
assert.Equal(t, types.OperationStateRunning, ops[0].GetState())

// check backup status (should be running)
backups, err := dbConnector.SelectBackups(ctx, &queries.ReadTableQueryImpl{})
assert.Empty(t, err)
assert.NotEmpty(t, backups)
assert.Equal(t, len(backups), 1)
assert.Equal(t, types.BackupStateRunning, backups[0].Status)

// check schedule
schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{})
assert.Empty(t, err)
assert.NotEmpty(t, schedules)
assert.Equal(t, len(schedules), 1)
assert.Equal(t, *schedules[0].LastBackupID, backups[0].ID)
assert.Greater(t, *schedules[0].NextLaunch, now)
}
Loading

0 comments on commit 08cc852

Please sign in to comment.