Skip to content

Commit

Permalink
feat: multiple claim (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmorency authored Mar 22, 2024
1 parent 192a08f commit a553bf6
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 53 deletions.
7 changes: 5 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version: 2.1

GO_VERSION: &go_version '1.22.1'
GORELEASER_VERSION: &goreleaser_version 'v1.24.0'
GO_MOD_CACHE_KEY: &go_mod_cache_key 'go-mod-2'

orbs:
go: circleci/[email protected]
Expand All @@ -18,9 +19,11 @@ jobs:
- checkout
- go/install:
version: *go_version
- go/load-cache
- go/load-cache:
key: *go_mod_cache_key
- go/mod-download
- go/save-cache
- go/save-cache:
key: *go_mod_cache_key
- run: make test
- run: make coverage
- codecov/upload:
Expand Down
28 changes: 20 additions & 8 deletions cmd/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/liftedinit/mfx-migrator/internal/config"

"github.com/liftedinit/mfx-migrator/internal/store"
)

Expand Down Expand Up @@ -36,6 +38,9 @@ func ClaimCmdRunE(cmd *cobra.Command, args []string) error {

claimConfig := LoadClaimConfigFromCLI()
slog.Debug("args", "claim-c", claimConfig)
if err := claimConfig.Validate(); err != nil {
return err
}

authConfig := LoadAuthConfigFromCLI()
slog.Debug("args", "auth-c", authConfig)
Expand All @@ -48,12 +53,12 @@ func ClaimCmdRunE(cmd *cobra.Command, args []string) error {
return err
}

item, err := claimWorkItem(r, c.UUID, claimConfig.Force)
items, err := claimWorkItem(r, c.UUID, claimConfig)
if err != nil {
return err
}

if item == nil {
if len(items) == 0 {
slog.Info("No work items available")
}

Expand All @@ -71,31 +76,38 @@ func SetupClaimCmdFlags(command *cobra.Command) {
slog.Error(ErrorBindingFlag, "error", err)
}

command.Flags().UintP("jobs", "j", 1, "Number of parallel jobs to claim")
if err := viper.BindPFlag("jobs", command.Flags().Lookup("jobs")); err != nil {
slog.Error(ErrorBindingFlag, "error", err)
}

command.Flags().String("uuid", "", "UUID of the work item to claim")
if err := viper.BindPFlag("claim-uuid", command.Flags().Lookup("uuid")); err != nil {
slog.Error(ErrorBindingFlag, "error", err)
}
}

// claimWorkItem claims a work item from the database
func claimWorkItem(r *resty.Client, uuidStr string, force bool) (*store.WorkItem, error) {
func claimWorkItem(r *resty.Client, uuidStr string, config config.ClaimConfig) ([]*store.WorkItem, error) {
slog.Info("Claiming work item...")
var err error
var item *store.WorkItem
var items []*store.WorkItem
if uuidStr != "" {
item, err = store.ClaimWorkItemFromUUID(r, uuid.MustParse(uuidStr), force)
var item *store.WorkItem
item, err = store.ClaimWorkItemFromUUID(r, uuid.MustParse(uuidStr), config.Force)
items = append(items, item)
} else {
item, err = store.ClaimWorkItemFromQueue(r)
items, err = store.ClaimWorkItemFromQueue(r, config.Jobs)
}

// An error occurred during the claim
if err != nil {
return nil, errors.WithMessage(err, "could not claim work item")
}

if item != nil {
for _, item := range items {
slog.Info("Work item claimed", "uuid", item.UUID)
}

return item, nil
return items, nil
}
9 changes: 5 additions & 4 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func CreateRestClient(ctx context.Context, url string, neighborhood uint64) *res
} else {
client = resty.New()
}
// Retry the claim process 3 times with a 5 seconds wait time between retries and a maximum wait time of 60 seconds.
// Retry uses an exponential backoff algorithm.
return client.
SetBaseURL(url).
SetPathParam("neighborhood", strconv.FormatUint(neighborhood, 10)).
SetRetryCount(3).
SetRetryWaitTime(5 * time.Second).SetRetryMaxWaitTime(60 * time.Second)
SetRetryCount(3). // Retry the request process 3 times. Retry uses an exponential backoff algorithm.
SetRetryWaitTime(5 * time.Second). // With a 5 seconds wait time between retries
SetRetryMaxWaitTime(60 * time.Second). // And a maximum wait time of 60 seconds for the whole process
SetTimeout(10 * time.Second) // Set a timeout of 10 seconds for the request
}

// AuthenticateRestClient logs in to the remote database
Expand Down Expand Up @@ -97,6 +97,7 @@ func LoadAuthConfigFromCLI() config.AuthConfig {
func LoadClaimConfigFromCLI() config.ClaimConfig {
return config.ClaimConfig{
Force: viper.GetBool("force"),
Jobs: viper.GetUint("jobs"),
}
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func init() {
SetupRootCmdFlags(rootCmd)

viper.AddConfigPath("./")
viper.SetConfigName("config")
viper.AddConfigPath("/mfx-migrator")
viper.SetConfigName("migrator-config")

viper.AutomaticEnv()
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.6.0
)

require (
Expand Down Expand Up @@ -139,7 +140,6 @@ require (
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,8 @@ github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2Ep
github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
github.com/reecepbcups/tokenfactory v0.50.0-alpha.3 h1:BNB22IzvbB9VDuvbvyr7Cke62uMDQgUXkYWJWKlI2oQ=
github.com/reecepbcups/tokenfactory v0.50.0-alpha.3/go.mod h1:qPchGcgRjxe1b6rnQOl+Rr2ruZmq8T4FcRBuLEuPICo=
github.com/remyoudompheng/go-dbus v0.0.0-20121104212943-b7232d34b1d5 h1:CvqZS4QYHBRvx7AeFdimd16HCbLlYsvQMcKDACpJW/c=
github.com/remyoudompheng/go-dbus v0.0.0-20121104212943-b7232d34b1d5/go.mod h1:+u151txRmLpwxBmpYn9z3d1sdJdjRPQpsXuYeY9jNls=
github.com/remyoudompheng/go-liblzma v0.0.0-20190506200333-81bf2d431b96 h1:J8J/cgLDRuqXJnwIrRDBvtl+LLsdg7De74znW/BRRq4=
Expand Down
12 changes: 6 additions & 6 deletions interchaintest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ require (
github.com/cosmos/cosmos-sdk v0.50.5
github.com/go-resty/resty/v2 v2.11.0
github.com/jarcoal/httpmock v1.3.1
github.com/liftedinit/manifest-ledger v0.0.1-alpha.1
github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240302231515-129bae4d6c47
github.com/liftedinit/mfx-migrator v0.0.0-20240221164912-17e19576ad59
github.com/reecepbcups/tokenfactory v0.50.0-alpha.3
github.com/liftedinit/manifest-ledger v0.0.0-20240314224639-e2635faa7017
github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240314224639-e2635faa7017
github.com/liftedinit/mfx-migrator v0.0.0-00000000000000-000000000000
github.com/spf13/cobra v1.8.0
github.com/strangelove-ventures/interchaintest/v8 v8.1.0
github.com/strangelove-ventures/poa v0.0.1-alpha.3
github.com/strangelove-ventures/tokenfactory v0.50.0-alpha.4
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.26.0
)
Expand All @@ -32,7 +32,7 @@ require (
cloud.google.com/go/compute v1.24.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.6 // indirect
cloud.google.com/go/storage v1.36.0 // indirect
cloud.google.com/go/storage v1.37.0 // indirect
cosmossdk.io/api v0.7.3 // indirect
cosmossdk.io/collections v0.4.0 // indirect
cosmossdk.io/core v0.12.0 // indirect
Expand Down Expand Up @@ -82,7 +82,7 @@ require (
github.com/cosmos/gogoproto v1.4.11 // indirect
github.com/cosmos/iavl v1.0.1 // indirect
github.com/cosmos/ibc-go/modules/capability v1.0.0 // indirect
github.com/cosmos/ibc-go/v8 v8.0.0 // indirect
github.com/cosmos/ibc-go/v8 v8.1.0 // indirect
github.com/cosmos/ics23/go v0.10.0 // indirect
github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
Expand Down
23 changes: 12 additions & 11 deletions interchaintest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeLgDvXzfIXc=
cloud.google.com/go/storage v1.27.0/go.mod h1:x9DOL8TK/ygDUMieqwfhdpQryTeEkhGKMi80i/iqR2s=
cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8=
cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8=
cloud.google.com/go/storage v1.37.0 h1:WI8CsaFO8Q9KjPVtsZ5Cmi0dXV25zMoX0FklT7c3Jm4=
cloud.google.com/go/storage v1.37.0/go.mod h1:i34TiT2IhiNDmcj65PqwCjcoUX7Z5pLzS8DEmoiFq1k=
cloud.google.com/go/talent v1.1.0/go.mod h1:Vl4pt9jiHKvOgF9KoZo6Kob9oV4lwd/ZD5Cto54zDRw=
cloud.google.com/go/talent v1.2.0/go.mod h1:MoNF9bhFQbiJ6eFD3uSsg0uBALw4n4gaCaEjBw9zo8g=
cloud.google.com/go/videointelligence v1.6.0/go.mod h1:w0DIDlVRKtwPCn/C4iwZIJdvC69yInhW0cfi+p546uU=
Expand Down Expand Up @@ -388,8 +388,8 @@ github.com/cosmos/iavl v1.0.1 h1:D+mYbcRO2wptYzOM1Hxl9cpmmHU1ZEt9T2Wv5nZTeUw=
github.com/cosmos/iavl v1.0.1/go.mod h1:8xIUkgVvwvVrBu81scdPty+/Dx9GqwHnAvXz4cwF7RY=
github.com/cosmos/ibc-go/modules/capability v1.0.0 h1:r/l++byFtn7jHYa09zlAdSeevo8ci1mVZNO9+V0xsLE=
github.com/cosmos/ibc-go/modules/capability v1.0.0/go.mod h1:D81ZxzjZAe0ZO5ambnvn1qedsFQ8lOwtqicG6liLBco=
github.com/cosmos/ibc-go/v8 v8.0.0 h1:QKipnr/NGwc+9L7NZipURvmSIu+nw9jOIWTJuDBqOhg=
github.com/cosmos/ibc-go/v8 v8.0.0/go.mod h1:C6IiJom0F3cIQCD5fKwVPDrDK9j/xTu563AWuOmXois=
github.com/cosmos/ibc-go/v8 v8.1.0 h1:pf1106wl0Cf+p1+FjXzV6odlS9DnqVunPVWCH1Uz+lQ=
github.com/cosmos/ibc-go/v8 v8.1.0/go.mod h1:o1ipS95xpdjqNcB8Drq0eI3Sn4FRLigjll42ec1ECuU=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/cosmos/ledger-cosmos-go v0.13.3 h1:7ehuBGuyIytsXbd4MP43mLeoN2LTOEnk5nvue4rK+yM=
Expand Down Expand Up @@ -801,10 +801,10 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-libp2p v0.31.0 h1:LFShhP8F6xthWiBBq3euxbKjZsoRajVEyBS9snfHxYg=
github.com/libp2p/go-libp2p v0.31.0/go.mod h1:W/FEK1c/t04PbRH3fA9i5oucu5YcgrG0JVoBWT1B7Eg=
github.com/liftedinit/manifest-ledger v0.0.1-alpha.1 h1:5fxhOdWoGzhPDvjOhLSCOoHL1pQ8rbfqlv/KAi0Rh68=
github.com/liftedinit/manifest-ledger v0.0.1-alpha.1/go.mod h1:5Cc9h8a0g4tSMXxLQHdnrsXYPD8Qq05tJ4aKDFd+cDo=
github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240302231515-129bae4d6c47 h1:zk/iW2W6nYnjZN6Bq9tpIlBAnQbMcDIARLmxMD9gMVg=
github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240302231515-129bae4d6c47/go.mod h1:s4i5aiZFOmcqUcBw/BKtvdYBoYNlcmt57n4F2i43CyQ=
github.com/liftedinit/manifest-ledger v0.0.0-20240314224639-e2635faa7017 h1:7JoK+r3BY+Rc+yuk7GiDEoCyoja83udcP/5+k1zRZhg=
github.com/liftedinit/manifest-ledger v0.0.0-20240314224639-e2635faa7017/go.mod h1:VPZ1gL9nsQ27TSHMa026XvZ9BikPnqAKexx2sOwqHGo=
github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240314224639-e2635faa7017 h1:johBFb/EFDdDO6+QnidfPr4AVh/2VKuUjA0/rKdbLII=
github.com/liftedinit/manifest-ledger/interchaintest v0.0.0-20240314224639-e2635faa7017/go.mod h1:2XWGqup5MUjgdJKX3YIOwtv4HpOEws2o/+c6P37dVsE=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linxGnu/grocksdb v1.8.12 h1:1/pCztQUOa3BX/1gR3jSZDoaKFpeHFvQ1XrqZpSvZVo=
Expand Down Expand Up @@ -1002,8 +1002,6 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/reecepbcups/tokenfactory v0.50.0-alpha.3 h1:BNB22IzvbB9VDuvbvyr7Cke62uMDQgUXkYWJWKlI2oQ=
github.com/reecepbcups/tokenfactory v0.50.0-alpha.3/go.mod h1:qPchGcgRjxe1b6rnQOl+Rr2ruZmq8T4FcRBuLEuPICo=
github.com/regen-network/protobuf v1.3.3-alpha.regen.1 h1:OHEc+q5iIAXpqiqFKeLpu5NwTIkVXUs48vFMwzqpqY4=
github.com/regen-network/protobuf v1.3.3-alpha.regen.1/go.mod h1:2DjTFR1HhMQhiWC5sZ4OhQ3+NtdbZ6oBDKQwq5Ou+FI=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
Expand Down Expand Up @@ -1072,6 +1070,8 @@ github.com/strangelove-ventures/interchaintest/v8 v8.1.0 h1:+VOGGR2sEP2gLvx0ojRO
github.com/strangelove-ventures/interchaintest/v8 v8.1.0/go.mod h1:kXw3vLQdEEcvyJ3ZindGPigpHgIdwrywNsQKkARb+qM=
github.com/strangelove-ventures/poa v0.0.1-alpha.3 h1:BtmF6nJDBARDNefO0axuMD82+nkZQWa2OkZXQaQTwpI=
github.com/strangelove-ventures/poa v0.0.1-alpha.3/go.mod h1:LcmorSGWRyn/M5hch7dAW7l0aYL+VSw28uzdsjdOduc=
github.com/strangelove-ventures/tokenfactory v0.50.0-alpha.4 h1:tUIR4dNh3lhZ2mE0sWIN4NAjH0snTSLIX3AKNUqdNzo=
github.com/strangelove-ventures/tokenfactory v0.50.0-alpha.4/go.mod h1:U3uYGsgdHRdM8sAYL+yHP1zKF2XtUJnv+mBs3dr/ybw=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
Expand Down Expand Up @@ -1548,8 +1548,9 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
2 changes: 1 addition & 1 deletion interchaintest/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"github.com/cosmos/cosmos-sdk/crypto/keyring"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/liftedinit/manifest-ledger/interchaintest/helpers"
tokenfactorytypes "github.com/reecepbcups/tokenfactory/x/tokenfactory/types"
"github.com/strangelove-ventures/interchaintest/v8"
"github.com/strangelove-ventures/interchaintest/v8/testreporter"
poatypes "github.com/strangelove-ventures/poa"
tokenfactorytypes "github.com/strangelove-ventures/tokenfactory/x/tokenfactory/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ func (c AuthConfig) Validate() error {

type ClaimConfig struct {
Force bool // Force re-claiming of a failed work item
Jobs uint // Number of parallel jobs to claim
}

func (c ClaimConfig) Validate() error {
if c.Jobs == 0 {
return fmt.Errorf("jobs > 0 is required")
}

return nil
}

type MigrateConfig struct {
Expand Down
37 changes: 27 additions & 10 deletions internal/store/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,49 @@ import (
"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

// ClaimWorkItemFromQueue retrieves a work item from the remote database work queue.
// TODO: Support claiming multiple work items at once
func ClaimWorkItemFromQueue(r *resty.Client) (*WorkItem, error) {
// Items will be claimed in parallel using goroutine.
// The maximum number of items that can be claimed in parallel is defined by the pagination of the remote database.
func ClaimWorkItemFromQueue(r *resty.Client, jobs uint) ([]*WorkItem, error) {
// 1. Get all work items from remote
status := CREATED
items, err := GetAllWorkItems(r, &status)
if err != nil {
return nil, errors.WithMessage(err, ErrorGettingWorkItems)
}

var g errgroup.Group
g.SetLimit(int(jobs))
claimedItems := make([]*WorkItem, 0)

// 2. Loop over all work items
for _, item := range items.Items {
// 2.0 Check if the work item is in the correct state to be claimed
if !itemCanBeClaimed(&item, false) {
slog.Warn("unable to claim work item, invalid state", "uuid", item.UUID, "status", item.Status.String())
continue
}
item := item
g.Go(func() error {
// 2.0 Check if the work item is in the correct state to be claimed
if !itemCanBeClaimed(&item, false) {
slog.Warn("unable to claim work item, invalid state", "uuid", item.UUID, "status", item.Status.String())
return nil
}

// 2.1 Try claiming the work item
return claimItem(r, &item)
claimedItem, err := claimItem(r, &item)
if err != nil {
return errors.WithMessage(err, ErrorClaimingWorkItem)
}
claimedItems = append(claimedItems, claimedItem)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, errors.WithMessage(err, ErrorClaimingWorkItem)
}

// No work items available
return nil, nil
return claimedItems, nil
}

func ClaimWorkItemFromUUID(r *resty.Client, uuid uuid.UUID, force bool) (*WorkItem, error) {
Expand Down
Loading

0 comments on commit a553bf6

Please sign in to comment.