From a553bf6ec7f24079ab14fec15ad1c71892437af4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20C=2E=20Morency?= <1102868+fmorency@users.noreply.github.com> Date: Fri, 22 Mar 2024 15:50:52 -0400 Subject: [PATCH] feat: multiple claim (#8) --- .circleci/config.yml | 7 +++++-- cmd/claim.go | 28 +++++++++++++++++++-------- cmd/common.go | 9 +++++---- cmd/root.go | 3 ++- go.mod | 2 +- go.work.sum | 2 ++ interchaintest/go.mod | 12 ++++++------ interchaintest/go.sum | 23 +++++++++++----------- interchaintest/setup.go | 2 +- internal/config/config.go | 9 +++++++++ internal/store/claim.go | 37 ++++++++++++++++++++++++++---------- internal/store/claim_test.go | 26 ++++++++++++++++--------- 12 files changed, 107 insertions(+), 53 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 176832a..0958932 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,6 +4,7 @@ version: 2.1 GO_VERSION: &go_version '1.22.1' GORELEASER_VERSION: &goreleaser_version 'v1.24.0' +GO_MOD_CACHE_KEY: &go_mod_cache_key 'go-mod-2' orbs: go: circleci/go@1.11.0 @@ -18,9 +19,11 @@ jobs: - checkout - go/install: version: *go_version - - go/load-cache + - go/load-cache: + key: *go_mod_cache_key - go/mod-download - - go/save-cache + - go/save-cache: + key: *go_mod_cache_key - run: make test - run: make coverage - codecov/upload: diff --git a/cmd/claim.go b/cmd/claim.go index d954859..eee3aed 100644 --- a/cmd/claim.go +++ b/cmd/claim.go @@ -9,6 +9,8 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/liftedinit/mfx-migrator/internal/config" + "github.com/liftedinit/mfx-migrator/internal/store" ) @@ -36,6 +38,9 @@ func ClaimCmdRunE(cmd *cobra.Command, args []string) error { claimConfig := LoadClaimConfigFromCLI() slog.Debug("args", "claim-c", claimConfig) + if err := claimConfig.Validate(); err != nil { + return err + } authConfig := LoadAuthConfigFromCLI() slog.Debug("args", "auth-c", authConfig) @@ -48,12 +53,12 @@ func ClaimCmdRunE(cmd *cobra.Command, args []string) error { return err } - item, err := claimWorkItem(r, c.UUID, claimConfig.Force) + items, err := claimWorkItem(r, c.UUID, claimConfig) if err != nil { return err } - if item == nil { + if len(items) == 0 { slog.Info("No work items available") } @@ -71,6 +76,11 @@ func SetupClaimCmdFlags(command *cobra.Command) { slog.Error(ErrorBindingFlag, "error", err) } + command.Flags().UintP("jobs", "j", 1, "Number of parallel jobs to claim") + if err := viper.BindPFlag("jobs", command.Flags().Lookup("jobs")); err != nil { + slog.Error(ErrorBindingFlag, "error", err) + } + command.Flags().String("uuid", "", "UUID of the work item to claim") if err := viper.BindPFlag("claim-uuid", command.Flags().Lookup("uuid")); err != nil { slog.Error(ErrorBindingFlag, "error", err) @@ -78,14 +88,16 @@ func SetupClaimCmdFlags(command *cobra.Command) { } // claimWorkItem claims a work item from the database -func claimWorkItem(r *resty.Client, uuidStr string, force bool) (*store.WorkItem, error) { +func claimWorkItem(r *resty.Client, uuidStr string, config config.ClaimConfig) ([]*store.WorkItem, error) { slog.Info("Claiming work item...") var err error - var item *store.WorkItem + var items []*store.WorkItem if uuidStr != "" { - item, err = store.ClaimWorkItemFromUUID(r, uuid.MustParse(uuidStr), force) + var item *store.WorkItem + item, err = store.ClaimWorkItemFromUUID(r, uuid.MustParse(uuidStr), config.Force) + items = append(items, item) } else { - item, err = store.ClaimWorkItemFromQueue(r) + items, err = store.ClaimWorkItemFromQueue(r, config.Jobs) } // An error occurred during the claim @@ -93,9 +105,9 @@ func claimWorkItem(r *resty.Client, uuidStr string, force bool) (*store.WorkItem return nil, errors.WithMessage(err, "could not claim work item") } - if item != nil { + for _, item := range items { slog.Info("Work item claimed", "uuid", item.UUID) } - return item, nil + return items, nil } diff --git a/cmd/common.go b/cmd/common.go index 53f2dd4..d7bb5a7 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -31,13 +31,13 @@ func CreateRestClient(ctx context.Context, url string, neighborhood uint64) *res } else { client = resty.New() } - // Retry the claim process 3 times with a 5 seconds wait time between retries and a maximum wait time of 60 seconds. - // Retry uses an exponential backoff algorithm. return client. SetBaseURL(url). SetPathParam("neighborhood", strconv.FormatUint(neighborhood, 10)). - SetRetryCount(3). - SetRetryWaitTime(5 * time.Second).SetRetryMaxWaitTime(60 * time.Second) + SetRetryCount(3). // Retry the request process 3 times. Retry uses an exponential backoff algorithm. + SetRetryWaitTime(5 * time.Second). // With a 5 seconds wait time between retries + SetRetryMaxWaitTime(60 * time.Second). // And a maximum wait time of 60 seconds for the whole process + SetTimeout(10 * time.Second) // Set a timeout of 10 seconds for the request } // AuthenticateRestClient logs in to the remote database @@ -97,6 +97,7 @@ func LoadAuthConfigFromCLI() config.AuthConfig { func LoadClaimConfigFromCLI() config.ClaimConfig { return config.ClaimConfig{ Force: viper.GetBool("force"), + Jobs: viper.GetUint("jobs"), } } diff --git a/cmd/root.go b/cmd/root.go index 5a54ad5..c51a152 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -93,7 +93,8 @@ func init() { SetupRootCmdFlags(rootCmd) viper.AddConfigPath("./") - viper.SetConfigName("config") + viper.AddConfigPath("/mfx-migrator") + viper.SetConfigName("migrator-config") viper.AutomaticEnv() } diff --git a/go.mod b/go.mod index c11076b..52fcfcd 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 + golang.org/x/sync v0.6.0 ) require ( @@ -139,7 +140,6 @@ require ( golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/net v0.22.0 // indirect - golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.work.sum b/go.work.sum index c5ffbc2..ace9301 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1558,6 +1558,8 @@ github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2Ep github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= +github.com/reecepbcups/tokenfactory v0.50.0-alpha.3 h1:BNB22IzvbB9VDuvbvyr7Cke62uMDQgUXkYWJWKlI2oQ= +github.com/reecepbcups/tokenfactory v0.50.0-alpha.3/go.mod h1:qPchGcgRjxe1b6rnQOl+Rr2ruZmq8T4FcRBuLEuPICo= github.com/remyoudompheng/go-dbus v0.0.0-20121104212943-b7232d34b1d5 h1:CvqZS4QYHBRvx7AeFdimd16HCbLlYsvQMcKDACpJW/c= github.com/remyoudompheng/go-dbus v0.0.0-20121104212943-b7232d34b1d5/go.mod h1:+u151txRmLpwxBmpYn9z3d1sdJdjRPQpsXuYeY9jNls= github.com/remyoudompheng/go-liblzma v0.0.0-20190506200333-81bf2d431b96 h1:J8J/cgLDRuqXJnwIrRDBvtl+LLsdg7De74znW/BRRq4= diff --git a/interchaintest/go.mod b/interchaintest/go.mod index 87353d1..ece53f4 100644 --- a/interchaintest/go.mod +++ b/interchaintest/go.mod @@ -16,13 +16,13 @@ require ( github.com/cosmos/cosmos-sdk v0.50.5 github.com/go-resty/resty/v2 v2.11.0 github.com/jarcoal/httpmock v1.3.1 - github.com/liftedinit/manifest-ledger v0.0.1-alpha.1 - github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240302231515-129bae4d6c47 - github.com/liftedinit/mfx-migrator v0.0.0-20240221164912-17e19576ad59 - github.com/reecepbcups/tokenfactory v0.50.0-alpha.3 + github.com/liftedinit/manifest-ledger v0.0.0-20240314224639-e2635faa7017 + github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240314224639-e2635faa7017 + github.com/liftedinit/mfx-migrator v0.0.0-00000000000000-000000000000 github.com/spf13/cobra v1.8.0 github.com/strangelove-ventures/interchaintest/v8 v8.1.0 github.com/strangelove-ventures/poa v0.0.1-alpha.3 + github.com/strangelove-ventures/tokenfactory v0.50.0-alpha.4 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.26.0 ) @@ -32,7 +32,7 @@ require ( cloud.google.com/go/compute v1.24.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.6 // indirect - cloud.google.com/go/storage v1.36.0 // indirect + cloud.google.com/go/storage v1.37.0 // indirect cosmossdk.io/api v0.7.3 // indirect cosmossdk.io/collections v0.4.0 // indirect cosmossdk.io/core v0.12.0 // indirect @@ -82,7 +82,7 @@ require ( github.com/cosmos/gogoproto v1.4.11 // indirect github.com/cosmos/iavl v1.0.1 // indirect github.com/cosmos/ibc-go/modules/capability v1.0.0 // indirect - github.com/cosmos/ibc-go/v8 v8.0.0 // indirect + github.com/cosmos/ibc-go/v8 v8.1.0 // indirect github.com/cosmos/ics23/go v0.10.0 // indirect github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect github.com/danieljoos/wincred v1.1.2 // indirect diff --git a/interchaintest/go.sum b/interchaintest/go.sum index e1c639a..c2b52d5 100644 --- a/interchaintest/go.sum +++ b/interchaintest/go.sum @@ -171,8 +171,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeLgDvXzfIXc= cloud.google.com/go/storage v1.27.0/go.mod h1:x9DOL8TK/ygDUMieqwfhdpQryTeEkhGKMi80i/iqR2s= -cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8= -cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= +cloud.google.com/go/storage v1.37.0 h1:WI8CsaFO8Q9KjPVtsZ5Cmi0dXV25zMoX0FklT7c3Jm4= +cloud.google.com/go/storage v1.37.0/go.mod h1:i34TiT2IhiNDmcj65PqwCjcoUX7Z5pLzS8DEmoiFq1k= cloud.google.com/go/talent v1.1.0/go.mod h1:Vl4pt9jiHKvOgF9KoZo6Kob9oV4lwd/ZD5Cto54zDRw= cloud.google.com/go/talent v1.2.0/go.mod h1:MoNF9bhFQbiJ6eFD3uSsg0uBALw4n4gaCaEjBw9zo8g= cloud.google.com/go/videointelligence v1.6.0/go.mod h1:w0DIDlVRKtwPCn/C4iwZIJdvC69yInhW0cfi+p546uU= @@ -388,8 +388,8 @@ github.com/cosmos/iavl v1.0.1 h1:D+mYbcRO2wptYzOM1Hxl9cpmmHU1ZEt9T2Wv5nZTeUw= github.com/cosmos/iavl v1.0.1/go.mod h1:8xIUkgVvwvVrBu81scdPty+/Dx9GqwHnAvXz4cwF7RY= github.com/cosmos/ibc-go/modules/capability v1.0.0 h1:r/l++byFtn7jHYa09zlAdSeevo8ci1mVZNO9+V0xsLE= github.com/cosmos/ibc-go/modules/capability v1.0.0/go.mod h1:D81ZxzjZAe0ZO5ambnvn1qedsFQ8lOwtqicG6liLBco= -github.com/cosmos/ibc-go/v8 v8.0.0 h1:QKipnr/NGwc+9L7NZipURvmSIu+nw9jOIWTJuDBqOhg= -github.com/cosmos/ibc-go/v8 v8.0.0/go.mod h1:C6IiJom0F3cIQCD5fKwVPDrDK9j/xTu563AWuOmXois= +github.com/cosmos/ibc-go/v8 v8.1.0 h1:pf1106wl0Cf+p1+FjXzV6odlS9DnqVunPVWCH1Uz+lQ= +github.com/cosmos/ibc-go/v8 v8.1.0/go.mod h1:o1ipS95xpdjqNcB8Drq0eI3Sn4FRLigjll42ec1ECuU= github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM= github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0= github.com/cosmos/ledger-cosmos-go v0.13.3 h1:7ehuBGuyIytsXbd4MP43mLeoN2LTOEnk5nvue4rK+yM= @@ -801,10 +801,10 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-libp2p v0.31.0 h1:LFShhP8F6xthWiBBq3euxbKjZsoRajVEyBS9snfHxYg= github.com/libp2p/go-libp2p v0.31.0/go.mod h1:W/FEK1c/t04PbRH3fA9i5oucu5YcgrG0JVoBWT1B7Eg= -github.com/liftedinit/manifest-ledger v0.0.1-alpha.1 h1:5fxhOdWoGzhPDvjOhLSCOoHL1pQ8rbfqlv/KAi0Rh68= -github.com/liftedinit/manifest-ledger v0.0.1-alpha.1/go.mod h1:5Cc9h8a0g4tSMXxLQHdnrsXYPD8Qq05tJ4aKDFd+cDo= -github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240302231515-129bae4d6c47 h1:zk/iW2W6nYnjZN6Bq9tpIlBAnQbMcDIARLmxMD9gMVg= -github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240302231515-129bae4d6c47/go.mod h1:s4i5aiZFOmcqUcBw/BKtvdYBoYNlcmt57n4F2i43CyQ= +github.com/liftedinit/manifest-ledger v0.0.0-20240314224639-e2635faa7017 h1:7JoK+r3BY+Rc+yuk7GiDEoCyoja83udcP/5+k1zRZhg= +github.com/liftedinit/manifest-ledger v0.0.0-20240314224639-e2635faa7017/go.mod h1:VPZ1gL9nsQ27TSHMa026XvZ9BikPnqAKexx2sOwqHGo= +github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240314224639-e2635faa7017 h1:johBFb/EFDdDO6+QnidfPr4AVh/2VKuUjA0/rKdbLII= +github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240314224639-e2635faa7017/go.mod h1:2XWGqup5MUjgdJKX3YIOwtv4HpOEws2o/+c6P37dVsE= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/linxGnu/grocksdb v1.8.12 h1:1/pCztQUOa3BX/1gR3jSZDoaKFpeHFvQ1XrqZpSvZVo= @@ -1002,8 +1002,6 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/reecepbcups/tokenfactory v0.50.0-alpha.3 h1:BNB22IzvbB9VDuvbvyr7Cke62uMDQgUXkYWJWKlI2oQ= -github.com/reecepbcups/tokenfactory v0.50.0-alpha.3/go.mod h1:qPchGcgRjxe1b6rnQOl+Rr2ruZmq8T4FcRBuLEuPICo= github.com/regen-network/protobuf v1.3.3-alpha.regen.1 h1:OHEc+q5iIAXpqiqFKeLpu5NwTIkVXUs48vFMwzqpqY4= github.com/regen-network/protobuf v1.3.3-alpha.regen.1/go.mod h1:2DjTFR1HhMQhiWC5sZ4OhQ3+NtdbZ6oBDKQwq5Ou+FI= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= @@ -1072,6 +1070,8 @@ github.com/strangelove-ventures/interchaintest/v8 v8.1.0 h1:+VOGGR2sEP2gLvx0ojRO github.com/strangelove-ventures/interchaintest/v8 v8.1.0/go.mod h1:kXw3vLQdEEcvyJ3ZindGPigpHgIdwrywNsQKkARb+qM= github.com/strangelove-ventures/poa v0.0.1-alpha.3 h1:BtmF6nJDBARDNefO0axuMD82+nkZQWa2OkZXQaQTwpI= github.com/strangelove-ventures/poa v0.0.1-alpha.3/go.mod h1:LcmorSGWRyn/M5hch7dAW7l0aYL+VSw28uzdsjdOduc= +github.com/strangelove-ventures/tokenfactory v0.50.0-alpha.4 h1:tUIR4dNh3lhZ2mE0sWIN4NAjH0snTSLIX3AKNUqdNzo= +github.com/strangelove-ventures/tokenfactory v0.50.0-alpha.4/go.mod h1:U3uYGsgdHRdM8sAYL+yHP1zKF2XtUJnv+mBs3dr/ybw= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -1548,8 +1548,9 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= diff --git a/interchaintest/setup.go b/interchaintest/setup.go index 22793e0..984c01a 100644 --- a/interchaintest/setup.go +++ b/interchaintest/setup.go @@ -12,10 +12,10 @@ import ( "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/liftedinit/manifest-ledger/interchaintest/helpers" - tokenfactorytypes "github.com/reecepbcups/tokenfactory/x/tokenfactory/types" "github.com/strangelove-ventures/interchaintest/v8" "github.com/strangelove-ventures/interchaintest/v8/testreporter" poatypes "github.com/strangelove-ventures/poa" + tokenfactorytypes "github.com/strangelove-ventures/tokenfactory/x/tokenfactory/types" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" diff --git a/internal/config/config.go b/internal/config/config.go index 30955af..0388443 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -67,6 +67,15 @@ func (c AuthConfig) Validate() error { type ClaimConfig struct { Force bool // Force re-claiming of a failed work item + Jobs uint // Number of parallel jobs to claim +} + +func (c ClaimConfig) Validate() error { + if c.Jobs == 0 { + return fmt.Errorf("jobs > 0 is required") + } + + return nil } type MigrateConfig struct { diff --git a/internal/store/claim.go b/internal/store/claim.go index 11dd73f..3f06251 100644 --- a/internal/store/claim.go +++ b/internal/store/claim.go @@ -7,11 +7,13 @@ import ( "github.com/go-resty/resty/v2" "github.com/google/uuid" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) // ClaimWorkItemFromQueue retrieves a work item from the remote database work queue. -// TODO: Support claiming multiple work items at once -func ClaimWorkItemFromQueue(r *resty.Client) (*WorkItem, error) { +// Items will be claimed in parallel using goroutine. +// The maximum number of items that can be claimed in parallel is defined by the pagination of the remote database. +func ClaimWorkItemFromQueue(r *resty.Client, jobs uint) ([]*WorkItem, error) { // 1. Get all work items from remote status := CREATED items, err := GetAllWorkItems(r, &status) @@ -19,20 +21,35 @@ func ClaimWorkItemFromQueue(r *resty.Client) (*WorkItem, error) { return nil, errors.WithMessage(err, ErrorGettingWorkItems) } + var g errgroup.Group + g.SetLimit(int(jobs)) + claimedItems := make([]*WorkItem, 0) + // 2. Loop over all work items for _, item := range items.Items { - // 2.0 Check if the work item is in the correct state to be claimed - if !itemCanBeClaimed(&item, false) { - slog.Warn("unable to claim work item, invalid state", "uuid", item.UUID, "status", item.Status.String()) - continue - } + item := item + g.Go(func() error { + // 2.0 Check if the work item is in the correct state to be claimed + if !itemCanBeClaimed(&item, false) { + slog.Warn("unable to claim work item, invalid state", "uuid", item.UUID, "status", item.Status.String()) + return nil + } - // 2.1 Try claiming the work item - return claimItem(r, &item) + claimedItem, err := claimItem(r, &item) + if err != nil { + return errors.WithMessage(err, ErrorClaimingWorkItem) + } + claimedItems = append(claimedItems, claimedItem) + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, errors.WithMessage(err, ErrorClaimingWorkItem) } // No work items available - return nil, nil + return claimedItems, nil } func ClaimWorkItemFromUUID(r *resty.Client, uuid uuid.UUID, force bool) (*WorkItem, error) { diff --git a/internal/store/claim_test.go b/internal/store/claim_test.go index 21090b2..d24e1fe 100644 --- a/internal/store/claim_test.go +++ b/internal/store/claim_test.go @@ -37,10 +37,11 @@ func TestStore_Claim(t *testing.T) { {Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CLAIMED)}, {Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MigrationUpdateResponder}, }, func() { - item, err := store.ClaimWorkItemFromQueue(rClient) - require.NotEqual(t, uuid.Nil, item.UUID) + items, err := store.ClaimWorkItemFromQueue(rClient, 1) + require.NotEmpty(t, items) + require.NotEqual(t, uuid.Nil, items[0].UUID) require.NoError(t, err) - require.NotNil(t, item) + require.NotNil(t, items) }}, // Fail to claim a work item from the queue (work item update failure) {"failure_queue", []testutils.HttpResponder{ @@ -48,7 +49,7 @@ func TestStore_Claim(t *testing.T) { {Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CREATED)}, {Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.NotFoundResponder}, }, func() { - item, err := store.ClaimWorkItemFromQueue(rClient) + item, err := store.ClaimWorkItemFromQueue(rClient, 1) require.Error(t, err) require.ErrorContains(t, err, "could not claim work item") require.ErrorContains(t, err, "status code: 404") @@ -58,9 +59,9 @@ func TestStore_Claim(t *testing.T) { {"no_item_queue", []testutils.HttpResponder{ {Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.MustAllMigrationsGetResponder(0, store.CREATED)}, }, func() { - item, err := store.ClaimWorkItemFromQueue(rClient) + item, err := store.ClaimWorkItemFromQueue(rClient, 1) require.NoError(t, err) // no work items available - require.Nil(t, item) + require.Empty(t, item) }}, // Successfully claim a work item by UUID {"success_uuid", []testutils.HttpResponder{ @@ -141,7 +142,7 @@ func TestStore_Claim(t *testing.T) { {"invalid_work_items", []testutils.HttpResponder{ {Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.GarbageResponder}, }, func() { - item, err := store.ClaimWorkItemFromQueue(rClient) + item, err := store.ClaimWorkItemFromQueue(rClient, 1) require.Error(t, err) require.ErrorContains(t, err, "cannot unmarshal") require.Nil(t, item) @@ -150,7 +151,7 @@ func TestStore_Claim(t *testing.T) { {"invalid_all_work_items_url", []testutils.HttpResponder{ {Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.NotFoundResponder}, }, func() { - _, err := store.ClaimWorkItemFromQueue(rClient) + _, err := store.ClaimWorkItemFromQueue(rClient, 1) require.Error(t, err) // unable to list work items require.ErrorContains(t, err, "could not get all work items") require.ErrorContains(t, err, "status code: 404") @@ -161,13 +162,20 @@ func TestStore_Claim(t *testing.T) { {Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CREATED)}, {Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.NotFoundResponder}, }, func() { - item, err := store.ClaimWorkItemFromQueue(rClient) + item, err := store.ClaimWorkItemFromQueue(rClient, 1) require.Error(t, err) require.ErrorContains(t, err, "could not claim work item") require.ErrorContains(t, err, "error updating remote work item") require.ErrorContains(t, err, "status code: 404") require.Nil(t, item) }}, + //{"claim multiple", []testutils.HttpResponder{ + // {Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.MustAllMigrationsGetResponder(2, store.CREATED)}, + // {Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CREATED)}, + // {Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MigrationUpdateResponder}, + //}, func() { + // + //}}, } for _, tt := range tests {