Skip to content

Commit

Permalink
feat: add replay (#753)
Browse files Browse the repository at this point in the history
* feat: add replay entity and repository

* feat: add replay service

* feat: add replay handler

* feat: add replay manager and worker

* test: fix job run service and replay service test cases

* test: fix scheduler status test case

* refactor: add JobReplayRunService in ReplayWorker

* refactor: fix lint issues

* refactor: remove logical time from JobRunStatus

* feat: implement UpdateReplay in repository

* refactor: fetch a single replay request to be processed

* refactor: change implementation of GetReplayToExecute

* refactor: update replay state to in progress once it is picked

* refactor: pass job cron value to replay validate

* feat: implement replay validator

* refactor: change replay entity structure

* refactor: validator to accept replay request

* feat: implement GetReplayRequestsByStatus

* fix: GetReplayRequestByStatus invalid query

* feat: add replay timeout in server config

* refactor: update replay and update replay status in repository

* feat: add replay time out check

* refactor: remove replay id from ReplayWithRun

* feat: add replay create cli

* refactor: unexport replay entity fields

* fix: resolve migration files conflict

* test: add replay entity unit tests

* feat: add replay loop timeout in server config

* fix: replay repository to return scheduled time in utc

* fix: unable to process sequential replay properly issue

* fix: error log in replay service

* feat: register replay to optimus server

* chore: add todos in replay worker

* refactor: remove replay's worker and loop timeout configuration

* chore: reduce replay create client timeout from 1 hour to 1 minute

* fix: log level, log message and replay message

* refactor: reword error message when unable to parse cron interval

* fix: add missing description flag in replay create

* refactor: rename var and simplify processing new replay in replay worker

* refactor: remove start and end time index in replay request table

* chore: add note and todo in scheduler status and replay repository

* fix: update conflicted migration files number

* chore: fix lint issues

* chore: remove unnecessary error not found catch in replay repository

* fix: issue when replay worker unable to get job details and add more tests

* test: add more replay test cases

* chore: fix lint issue

* test: add replay manager unit tests

* test: add scheduler BC status unit tests

* feat: replay config overriding (#765)

* feat: support replay job config proton

* feat: add flag for replay job config

* feat: add job config on the replay

* feat: adding job replay repo placeholder

* feat: implement job replay repo

* fix: get replay config by tenant and job name

* refactor: rename get replay job config

* refactor: avoid magic number

* fix: linter

* fix: query for get replay job config

* chore: update proton commit

* fix: issue on replay creation when job config is not set

---------

Co-authored-by: Dery Rahman Ahaddienata <[email protected]>
  • Loading branch information
arinda-arif and deryrahman authored Mar 27, 2023
1 parent 8a15d90 commit d2225ee
Show file tree
Hide file tree
Showing 37 changed files with 3,242 additions and 1,811 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "fae8287656b163ae07a7f03edd3ea3f5df499dcb"
PROTON_COMMIT := "31ac9046d1a8c95a2f4645b87bf0620a3e6bb8bc"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down Expand Up @@ -66,4 +66,4 @@ install: ## install required dependencies
go install github.com/bufbuild/buf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]
go install github.com/grpc-ecosystem/grpc-gateway/v2/[email protected]
go install github.com/grpc-ecosystem/grpc-gateway/v2/[email protected]
go install github.com/grpc-ecosystem/grpc-gateway/v2/[email protected]
2 changes: 2 additions & 0 deletions client/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/optimus/client/cmd/playground"
"github.com/odpf/optimus/client/cmd/plugin"
"github.com/odpf/optimus/client/cmd/project"
"github.com/odpf/optimus/client/cmd/replay"
"github.com/odpf/optimus/client/cmd/resource"
"github.com/odpf/optimus/client/cmd/scheduler"
"github.com/odpf/optimus/client/cmd/secret"
Expand Down Expand Up @@ -66,6 +67,7 @@ func New() *cli.Command {
version.NewVersionCommand(),
playground.NewPlaygroundCommand(),
scheduler.NewSchedulerCommand(),
replay.NewReplayCommand(),

// Will decide later, to add it server side or not
plugin.NewPluginCommand(),
Expand Down
159 changes: 159 additions & 0 deletions client/cmd/replay/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package replay

import (
"errors"
"fmt"
"time"

"github.com/odpf/salt/log"
"github.com/spf13/cobra"
"golang.org/x/net/context"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/odpf/optimus/client/cmd/internal"
"github.com/odpf/optimus/client/cmd/internal/connectivity"
"github.com/odpf/optimus/client/cmd/internal/logger"
"github.com/odpf/optimus/config"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
)

const (
replayTimeout = time.Minute * 1
ISOTimeLayout = time.RFC3339
)

type createCommand struct {
logger log.Logger
configFilePath string

parallel bool
description string
jobConfig string

projectName string
namespaceName string
host string
}

// CreateCommand initializes command for creating a replay request
func CreateCommand() *cobra.Command {
refresh := &createCommand{
logger: logger.NewClientLogger(),
}

cmd := &cobra.Command{
Use: "create",
Short: "Run replay operation on a dag based on provided start and end time range",
Long: "This operation takes three arguments, first is DAG name[required]\nused in optimus specification, " +
"second is start time[required] of\nreplay, third is end time[optional] of replay. \nDate ranges are inclusive.",
Example: "optimus replay create <job_name> <2023-01-01T02:30:00Z00:00> [2023-01-02T02:30:00Z00:00]",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) < 1 {
return errors.New("job name is required")
}
if len(args) < 2 { //nolint: gomnd
return errors.New("replay start time is required")
}
return nil
},
RunE: refresh.RunE,
PreRunE: refresh.PreRunE,
}

refresh.injectFlags(cmd)
return cmd
}

func (r *createCommand) injectFlags(cmd *cobra.Command) {
// Config filepath flag
cmd.Flags().StringVarP(&r.configFilePath, "config", "c", config.EmptyPath, "File path for client configuration")
cmd.Flags().StringVarP(&r.namespaceName, "namespace-name", "n", "", "Name of the optimus namespace")

cmd.Flags().BoolVarP(&r.parallel, "parallel", "", false, "Backfill job runs in parallel")
cmd.Flags().StringVarP(&r.description, "description", "d", "", "Description of why backfill is needed")
cmd.Flags().StringVarP(&r.jobConfig, "job-config", "", "", "additional job configurations")

// Mandatory flags if config is not set
cmd.Flags().StringVarP(&r.projectName, "project-name", "p", "", "Name of the optimus project")
cmd.Flags().StringVar(&r.host, "host", "", "Optimus service endpoint url")
}

func (r *createCommand) PreRunE(cmd *cobra.Command, _ []string) error {
conf, err := internal.LoadOptionalConfig(r.configFilePath)
if err != nil {
return err
}

if conf == nil {
internal.MarkFlagsRequired(cmd, []string{"project-name", "host"})
return nil
}

if r.projectName == "" {
r.projectName = conf.Project.Name
}
if r.host == "" {
r.host = conf.Host
}
return nil
}

func (r *createCommand) RunE(_ *cobra.Command, args []string) error {
jobName := args[0]
startTime := args[1]
endTime := args[1]
if len(args) >= 3 { //nolint: gomnd
endTime = args[2]
}

replayID, err := r.createReplayRequest(jobName, startTime, endTime, r.jobConfig)
if err != nil {
return err
}
r.logger.Info("Replay request created with id %s", replayID)
return nil
}

func (r *createCommand) createReplayRequest(jobName, startTimeStr, endTimeStr, jobConfig string) (string, error) {
conn, err := connectivity.NewConnectivity(r.host, replayTimeout)
if err != nil {
return "", err
}
defer conn.Close()

replayService := pb.NewReplayServiceClient(conn.GetConnection())

startTime, err := getTimeProto(startTimeStr)
if err != nil {
return "", err
}
endTime, err := getTimeProto(endTimeStr)
if err != nil {
return "", err
}
respStream, err := replayService.Replay(conn.GetContext(), &pb.ReplayRequest{
ProjectName: r.projectName,
JobName: jobName,
NamespaceName: r.namespaceName,
StartTime: startTime,
EndTime: endTime,
Parallel: r.parallel,
Description: r.description,
JobConfig: jobConfig,
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.logger.Error("Replay creation took too long, timing out")
}
return "", fmt.Errorf("replay request failed: %w", err)
}
return respStream.Id, nil
}

func getTimeProto(timeStr string) (*timestamppb.Timestamp, error) {
parsedTime, err := time.Parse(ISOTimeLayout, timeStr)
if err != nil {
return nil, err
}
return timestamppb.New(parsedTime), nil
}
21 changes: 21 additions & 0 deletions client/cmd/replay/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package replay

import (
"github.com/spf13/cobra"
)

// NewReplayCommand initializes command for replay
func NewReplayCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "replay",
Short: "replay related functions",
Annotations: map[string]string{
"group:core": "false",
},
}

cmd.AddCommand(
CreateCommand(),
)
return cmd
}
8 changes: 8 additions & 0 deletions config/config_server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package config

import "time"

type ServerConfig struct {
Version Version `mapstructure:"version"`
Log LogConfig `mapstructure:"log"`
Expand All @@ -8,6 +10,7 @@ type ServerConfig struct {
Telemetry TelemetryConfig `mapstructure:"telemetry"`
ResourceManagers []ResourceManager `mapstructure:"resource_managers"`
Plugin PluginConfig `mapstructure:"plugin"`
Replay ReplayConfig `mapstructure:"replay"`
}

type Serve struct {
Expand Down Expand Up @@ -47,3 +50,8 @@ type ResourceManagerConfigOptimus struct {
type PluginConfig struct {
Artifacts []string `mapstructure:"artifacts"`
}

// TODO: add worker interval
type ReplayConfig struct {
ReplayTimeout time.Duration `mapstructure:"replay_timeout" default:"3h"`
}
3 changes: 3 additions & 0 deletions config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

saltConfig "github.com/odpf/salt/config"
"github.com/spf13/afero"
Expand Down Expand Up @@ -277,6 +278,8 @@ func (s *ConfigTestSuite) initExpectedServerConfig() {
},
}
s.expectedServerConfig.Plugin = config.PluginConfig{}

s.expectedServerConfig.Replay.ReplayTimeout = time.Hour * 3
}

func (*ConfigTestSuite) initServerConfigEnv() {
Expand Down
2 changes: 1 addition & 1 deletion core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (jh *JobHandler) ListJobSpecification(ctx context.Context, req *pb.ListJobS

func (*JobHandler) GetWindow(_ context.Context, req *pb.GetWindowRequest) (*pb.GetWindowResponse, error) {
// TODO: the default version to be deprecated & made mandatory in future releases
version := 1
version := 2
if err := req.GetScheduledAt().CheckValid(); err != nil {
return nil, fmt.Errorf("%w: failed to parse schedule time %s", err, req.GetScheduledAt())
}
Expand Down
83 changes: 83 additions & 0 deletions core/scheduler/handler/v1beta1/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package v1beta1

import (
"fmt"
"strings"

"github.com/google/uuid"
"github.com/odpf/salt/log"
"golang.org/x/net/context"

"github.com/odpf/optimus/core/scheduler"
"github.com/odpf/optimus/core/tenant"
"github.com/odpf/optimus/internal/errors"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
)

type ReplayService interface {
CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error)
}

type ReplayHandler struct {
l log.Logger
service ReplayService

pb.UnimplementedReplayServiceServer
}

func (h ReplayHandler) Replay(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayResponse, error) {
replayTenant, err := tenant.NewTenant(req.GetProjectName(), req.NamespaceName)
if err != nil {
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

jobName, err := scheduler.JobNameFrom(req.GetJobName())
if err != nil {
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

if err = req.GetStartTime().CheckValid(); err != nil {
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid start_time"), "unable to start replay for "+req.GetJobName())
}

if req.GetEndTime() != nil {
if err = req.GetEndTime().CheckValid(); err != nil {
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid end_time"), "unable to start replay for "+req.GetJobName())
}
}

jobConfig := make(map[string]string)
if req.JobConfig != "" {
jobConfig, err = parseJobConfig(req.JobConfig)
if err != nil {
return nil, errors.GRPCErr(err, "unable to parse replay job config for "+req.JobName)
}
}

replayConfig := scheduler.NewReplayConfig(req.GetStartTime().AsTime(), req.GetEndTime().AsTime(), req.Parallel, jobConfig, req.Description)
replayID, err := h.service.CreateReplay(ctx, replayTenant, jobName, replayConfig)
if err != nil {
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

return &pb.ReplayResponse{Id: replayID.String()}, nil
}

func parseJobConfig(jobConfig string) (map[string]string, error) {
configs := map[string]string{}
for _, config := range strings.Split(jobConfig, ",") {
keyValue := strings.Split(config, "=")
valueLen := 2
if len(keyValue) != valueLen {
return nil, fmt.Errorf("error on job config value, %s", config)
}
key := strings.TrimSpace(strings.ToUpper(keyValue[0]))
value := keyValue[1]
configs[key] = value
}
return configs, nil
}

func NewReplayHandler(l log.Logger, service ReplayService) *ReplayHandler {
return &ReplayHandler{l: l, service: service}
}
Loading

0 comments on commit d2225ee

Please sign in to comment.