Skip to content

Commit

Permalink
fix(inventory): readjust inventory on startup for existing leases (#222)
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored Mar 28, 2024
1 parent de51c95 commit 442dec2
Show file tree
Hide file tree
Showing 8 changed files with 755 additions and 345 deletions.
20 changes: 19 additions & 1 deletion cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat
}

var runch <-chan runner.Result
var currinv ctypes.Inventory

invupch := make(chan ctypes.Inventory, 1)

invch := is.clients.inventory.ResultChan()
var reserveChLocal <-chan inventoryRequest

Expand Down Expand Up @@ -543,6 +547,13 @@ loop:
"order", res.OrderID(),
"resource-group", res.Resources().GetName(),
"allocated", res.allocated)

if currinv != nil {
select {
case invupch <- currinv:
default:
}
}
}

break
Expand Down Expand Up @@ -606,12 +617,19 @@ loop:
res: resp,
err: err,
}
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
if err == nil {
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
} else {
inventoryRequestsCounter.WithLabelValues("status", "error").Inc()
}
case inv, open := <-invch:
if !open {
continue
}

invupch <- inv
case inv := <-invupch:
currinv = inv.Dup()
state.inventory = inv
updateIPs()

Expand Down
19 changes: 12 additions & 7 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestInventory_reservationAllocatable(t *testing.T) {
}
}

inv := <-cinventory.NewNull("a", "b").ResultChan()
inv := <-cinventory.NewNull(context.Background(), "a", "b").ResultChan()

reservations := []*reservation{
mkres(true, mkrg(750, 0, 3*unit.Gi, 1*unit.Gi, 0, 1)),
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestInventory_ClusterDeploymentNotDeployed(t *testing.T) {

ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))

inv, err := newInventoryService(
ctx,
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))

inv, err := newInventoryService(
ctx,
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestInventory_ReserveIPNoIPOperator(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))

inv, err := newInventoryService(
ctx,
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestInventory_ReserveIPUnavailableWithIPOperator(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientIP, cip.Client(mockIP))

inv, err := newInventoryService(
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA", "nodeB"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA", "nodeB"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientIP, cip.Client(mockIP))

inv, err := newInventoryService(
Expand Down Expand Up @@ -596,6 +596,7 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {
mockIP.AssertNumberOfCalls(t, "GetIPAddressStatus", 2)
}

// following test needs refactoring it reports incorrect inventory
func TestInventory_OverReservations(t *testing.T) {
scaffold := makeInventoryScaffold(t, 10)
defer scaffold.bus.Close()
Expand Down Expand Up @@ -659,10 +660,12 @@ func TestInventory_OverReservations(t *testing.T) {
ac := afake.NewSimpleClientset()

ctx, cancel := context.WithCancel(context.Background())
nullInv := cinventory.NewNull(ctx, "nodeA")

ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, nullInv)

inv, err := newInventoryService(
ctx,
Expand All @@ -685,6 +688,8 @@ func TestInventory_OverReservations(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ctypes.ErrInsufficientCapacity)

nullInv.Commit(reservation.Resources())

// Send the event immediately to indicate it was deployed
err = scaffold.bus.Publish(event.ClusterDeployment{
LeaseID: lid0,
Expand Down
9 changes: 2 additions & 7 deletions cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"

"github.com/tendermint/tendermint/libs/log"

inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1"

kutil "github.com/akash-network/provider/cluster/kube/util"
Expand Down Expand Up @@ -48,7 +46,6 @@ type client struct {

type inventory struct {
inventoryV1.Cluster
log log.Logger
}

type inventoryState struct {
Expand Down Expand Up @@ -209,22 +206,20 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In
var msg ctypes.Inventory
var och chan<- ctypes.Inventory

ilog := fromctx.LogcFromCtx(cl.ctx).With("inventory.adjuster")

for {
select {
case <-cl.ctx.Done():
return cl.ctx.Err()
case inv := <-in:
pending = append(pending, inv)
if och == nil {
msg = newInventory(ilog, pending[0])
msg = newInventory(pending[0])
och = out
}
case och <- msg:
pending = pending[1:]
if len(pending) > 0 {
msg = newInventory(ilog, pending[0])
msg = newInventory(pending[0])
} else {
och = nil
msg = nil
Expand Down
32 changes: 7 additions & 25 deletions cluster/kube/operators/clients/inventory/inventory.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package inventory

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/tendermint/tendermint/libs/log"

inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
types "github.com/akash-network/akash-api/go/node/types/v1beta3"
Expand All @@ -20,10 +17,9 @@ import (

var _ ctypes.Inventory = (*inventory)(nil)

func newInventory(log log.Logger, clState inventoryV1.Cluster) *inventory {
func newInventory(clState inventoryV1.Cluster) *inventory {
inv := &inventory{
Cluster: clState,
log: log,
}

return inv
Expand All @@ -32,12 +28,17 @@ func newInventory(log log.Logger, clState inventoryV1.Cluster) *inventory {
func (inv *inventory) dup() inventory {
dup := inventory{
Cluster: *inv.Cluster.Dup(),
log: inv.log,
}

return dup
}

func (inv *inventory) Dup() ctypes.Inventory {
dup := inv.dup()

return &dup
}

// tryAdjust cluster inventory
// It returns two boolean values. First indicates if node-wide resources satisfy (true) requirements
// Seconds indicates if cluster-wide resources satisfy (true) requirements
Expand Down Expand Up @@ -84,11 +85,6 @@ func (inv *inventory) tryAdjust(node int, res *types.Resources) (*crd.SchedulerP
return nil, false, true
}

// if !nd.tryAdjustVolumesAttached(types.NewResourceValue(1)) {
// return nil, false, true

// }

storageAdjusted := false

for idx := range storageClasses {
Expand Down Expand Up @@ -272,27 +268,13 @@ nodes:
// same adjusted resource units as well as cluster params
if adjustedGroup {
if !reflect.DeepEqual(adjusted, &adjustedResources[i].Resources) {
jFirstAdjusted, _ := json.Marshal(&adjustedResources[i].Resources)
jCurrAdjusted, _ := json.Marshal(adjusted)

inv.log.Error(fmt.Sprintf("resource mismatch between replicas within group:\n"+
"\tfirst adjusted replica: %s\n"+
"\tcurr adjusted replica: %s", string(jFirstAdjusted), string(jCurrAdjusted)))

err = ctypes.ErrGroupResourceMismatch
break nodes
}

// all replicas of the same service are expected to have same node selectors and runtimes
// if they don't match then provider cannot bid
if !reflect.DeepEqual(sparams, cparams[adjusted.ID]) {
jFirstSparams, _ := json.Marshal(cparams[adjusted.ID])
jCurrSparams, _ := json.Marshal(sparams)

inv.log.Error(fmt.Sprintf("scheduler params mismatch between replicas within group:\n"+
"\tfirst replica: %s\n"+
"\tcurr replica: %s", string(jFirstSparams), string(jCurrSparams)))

err = ctypes.ErrGroupResourceMismatch
break nodes
}
Expand Down
Loading

0 comments on commit 442dec2

Please sign in to comment.