Skip to content

Commit

Permalink
keep track of storage units as bytes to allow counting part of a unit (
Browse files Browse the repository at this point in the history
…#710)

* keep track of storage units as bytes to allow counting part of a unit

fixes 710

* make linter happy

* update tests
  • Loading branch information
zaibon authored Apr 13, 2020
1 parent 3524c49 commit 2f3b5a0
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 72 deletions.
8 changes: 4 additions & 4 deletions cmds/capacityd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ func cap(ctx context.Context, client zbus.Client) {
}

ru := directory.ResourceAmount{
Cru: int64(resources.CRU),
Mru: int64(resources.MRU),
Hru: int64(resources.HRU),
Sru: int64(resources.SRU),
Cru: resources.CRU,
Mru: float64(resources.MRU),
Hru: float64(resources.HRU),
Sru: float64(resources.SRU),
}

setCapacity := func() error {
Expand Down
34 changes: 20 additions & 14 deletions pkg/provision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,28 @@ func (e *defaultEngine) Run(ctx context.Context) error {
}
}

func (e *defaultEngine) updateReservedCapacity() error {
func (e defaultEngine) capacityUsed() (directory.ResourceAmount, directory.WorkloadAmount) {
counters := e.store.Counters()

resources := directory.ResourceAmount{
Sru: counters.SRU.Current(),
Hru: counters.HRU.Current(),
Cru: counters.CRU.Current(),
Mru: counters.MRU.Current(),
Mru: float64(counters.MRU.Current()) / float64(gib),
Sru: float64(counters.SRU.Current()) / float64(gib),
Hru: float64(counters.HRU.Current()) / float64(gib),
}

workloads := directory.WorkloadAmount{
Volume: counters.volumes.Current(),
Container: counters.containers.Current(),
ZDBNamespace: counters.zdbs.Current(),
K8sVM: counters.vms.Current(),
Network: counters.networks.Current(),
Volume: uint16(counters.volumes.Current()),
Container: uint16(counters.containers.Current()),
ZDBNamespace: uint16(counters.zdbs.Current()),
K8sVM: uint16(counters.vms.Current()),
Network: uint16(counters.networks.Current()),
}
return resources, workloads
}

func (e *defaultEngine) updateReservedCapacity() error {
resources, workloads := e.capacityUsed()
log.Info().Msgf("reserved resource %+v", resources)
log.Info().Msgf("provisionned workloads %+v", workloads)

Expand Down Expand Up @@ -254,11 +260,11 @@ func (e *defaultEngine) Counters(ctx context.Context) <-chan pkg.ProvisionCounte

c := e.store.Counters()
pc := pkg.ProvisionCounters{
Container: int64(c.containers),
Network: int64(c.networks),
ZDB: int64(c.zdbs),
Volume: int64(c.volumes),
VM: int64(c.vms),
Container: int64(c.containers.Current()),
Network: int64(c.networks.Current()),
ZDB: int64(c.zdbs.Current()),
Volume: int64(c.volumes.Current()),
VM: int64(c.vms.Current()),
}

select {
Expand Down
82 changes: 82 additions & 0 deletions pkg/provision/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package provision

import (
"encoding/json"
"io/ioutil"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/threefoldtech/zos/pkg"

"os"
"testing"
)

func TestEngine(t *testing.T) {
td, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer t.Cleanup(func() {
os.RemoveAll(td)
})

nodeID := "BhPhHVhfU8qerzzh1BGBgcQ7SQxQtND3JwuxSPoRzqkY"
store := &FSStore{
root: td,
}

engine := &defaultEngine{
nodeID: nodeID,
store: store,
}

mustJSONMarshal := func(v interface{}) []byte {
b, err := json.Marshal(v)
require.NoError(t, err)
return b
}

err = engine.store.Add(&Reservation{
ID: "1-1",
Type: VolumeReservation,
Data: mustJSONMarshal(Volume{
Size: 1,
Type: HDDDiskType,
}),
})
require.NoError(t, err)

err = engine.store.Add(&Reservation{
ID: "3-1",
Type: ZDBReservation,
Data: mustJSONMarshal(ZDB{
Size: 15,
Mode: pkg.ZDBModeSeq,
DiskType: pkg.SSDDevice,
}),
})
require.NoError(t, err)

err = engine.store.Add(&Reservation{
ID: "4-1",
Type: ContainerReservation,
Data: mustJSONMarshal(Container{
Capacity: ContainerCapacity{
CPU: 2,
Memory: 4096,
},
}),
})
require.NoError(t, err)

resources, workloads := engine.capacityUsed()
assert.Equal(t, uint64(2), resources.Cru)
assert.Equal(t, float64(4), resources.Mru)
assert.Equal(t, float64(15.25), resources.Sru)
assert.Equal(t, float64(1), resources.Hru)

assert.EqualValues(t, 1, workloads.Container)
assert.EqualValues(t, 0, workloads.Network)
assert.EqualValues(t, 1, workloads.Volume)
assert.EqualValues(t, 1, workloads.ZDBNamespace)
assert.EqualValues(t, 0, workloads.K8sVM)
}
34 changes: 17 additions & 17 deletions pkg/provision/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,43 @@ import (
// Counter interface
type Counter interface {
// Increment counter atomically by v
Increment(v int64) int64
Increment(v uint64) uint64
// Decrement counter atomically by v
Decrement(v int64) int64
Decrement(v uint64) uint64
// Current returns the current value
Current() int64
Current() uint64
}

type counterNop struct{}

func (c *counterNop) Increment(v int64) int64 {
func (c *counterNop) Increment(v uint64) uint64 {
return 0
}

func (c *counterNop) Decrement(v int64) int64 {
func (c *counterNop) Decrement(v uint64) uint64 {
return 0
}

func (c *counterNop) Current() int64 {
func (c *counterNop) Current() uint64 {
return 0
}

// counterImpl value for safe increment/decrement
type counterImpl int64
type counterImpl uint64

// Increment counter atomically by one
func (c *counterImpl) Increment(v int64) int64 {
return atomic.AddInt64((*int64)(c), v)
func (c *counterImpl) Increment(v uint64) uint64 {
return atomic.AddUint64((*uint64)(c), v)
}

// Decrement counter atomically by one
func (c *counterImpl) Decrement(v int64) int64 {
return atomic.AddInt64((*int64)(c), -v)
func (c *counterImpl) Decrement(v uint64) uint64 {
return atomic.AddUint64((*uint64)(c), -v)
}

// Current returns the current value
func (c *counterImpl) Current() int64 {
return atomic.LoadInt64((*int64)(c))
func (c *counterImpl) Current() uint64 {
return atomic.LoadUint64((*uint64)(c))
}

type (
Expand All @@ -77,10 +77,10 @@ type (
vms counterImpl
debugs counterImpl

SRU counterImpl
HRU counterImpl
MRU counterImpl
CRU counterImpl
SRU counterImpl // SSD storage in bytes
HRU counterImpl // HDD storage in bytes
MRU counterImpl // Memory storage in bytes
CRU counterImpl // CPU count absolute
}
)

Expand Down
39 changes: 22 additions & 17 deletions pkg/provision/resource_units.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package provision
import (
"encoding/json"
"fmt"
"math"
)

const (
mib = uint64(1024 * 1024)
gib = uint64(mib * 1024)
)

func (s *FSStore) processResourceUnits(r *Reservation, addOrRemoveBool bool) error {
Expand Down Expand Up @@ -43,10 +47,10 @@ func (s *FSStore) processResourceUnits(r *Reservation, addOrRemoveBool bool) err
}

type resourceUnits struct {
SRU int64 `json:"sru,omitempty"`
HRU int64 `json:"hru,omitempty"`
MRU int64 `json:"mru,omitempty"`
CRU int64 `json:"cru,omitempty"`
SRU uint64 `json:"sru,omitempty"`
HRU uint64 `json:"hru,omitempty"`
MRU uint64 `json:"mru,omitempty"`
CRU uint64 `json:"cru,omitempty"`
}

func processVolume(r *Reservation) (u resourceUnits, err error) {
Expand All @@ -55,12 +59,12 @@ func processVolume(r *Reservation) (u resourceUnits, err error) {
return u, err
}

// volume.size and SRU is in GiB, not conversion needed
// volume.size and SRU is in GiB
switch volume.Type {
case SSDDiskType:
u.SRU = int64(volume.Size)
u.SRU = volume.Size * gib
case HDDDiskType:
u.HRU = int64(volume.Size)
u.HRU = volume.Size * gib
}

return u, nil
Expand All @@ -71,9 +75,10 @@ func processContainer(r *Reservation) (u resourceUnits, err error) {
if err = json.Unmarshal(r.Data, &cont); err != nil {
return u, err
}
u.CRU = int64(cont.Capacity.CPU)
// memory is in MiB, but MRU is in GiB
u.MRU = int64(math.Ceil(float64(cont.Capacity.Memory) / 1024.0))
u.CRU = uint64(cont.Capacity.CPU)
// memory is in MiB
u.MRU = cont.Capacity.Memory * mib
u.SRU = 256 * mib // 250MiB are allocated on SSD for the root filesystem used by the flist

return u, nil
}
Expand All @@ -89,9 +94,9 @@ func processZdb(r *Reservation) (u resourceUnits, err error) {

switch zdbVolume.DiskType {
case "SSD":
u.SRU = int64(zdbVolume.Size)
u.SRU = zdbVolume.Size * gib
case "HDD":
u.HRU = int64(zdbVolume.Size)
u.HRU = zdbVolume.Size * gib
}

return u, nil
Expand All @@ -107,12 +112,12 @@ func processKubernetes(r *Reservation) (u resourceUnits, err error) {
switch k8s.Size {
case 1:
u.CRU = 1
u.MRU = 2
u.SRU = 50
u.MRU = 2 * gib
u.SRU = 50 * gib
case 2:
u.CRU = 2
u.MRU = 4
u.SRU = 100
u.MRU = 4 * gib
u.SRU = 100 * gib
}

return u, nil
Expand Down
24 changes: 13 additions & 11 deletions pkg/provision/resource_units_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Test_processZDB(t *testing.T) {
},
},
want: resourceUnits{
SRU: 1,
SRU: 1 * gib,
},
wantErr: false,
},
Expand All @@ -49,7 +49,7 @@ func Test_processZDB(t *testing.T) {
},
},
want: resourceUnits{
HRU: 1,
HRU: 1 * gib,
},
wantErr: false,
},
Expand Down Expand Up @@ -89,7 +89,7 @@ func Test_processVolume(t *testing.T) {
},
},
wantU: resourceUnits{
SRU: 1,
SRU: 1 * gib,
},
},
{
Expand All @@ -104,7 +104,7 @@ func Test_processVolume(t *testing.T) {
},
},
wantU: resourceUnits{
HRU: 1,
HRU: 1 * gib,
},
},
}
Expand Down Expand Up @@ -146,7 +146,8 @@ func Test_processContainer(t *testing.T) {
},
wantU: resourceUnits{
CRU: 2,
MRU: 1,
MRU: 1 * gib,
SRU: 256 * mib,
},
},
{
Expand All @@ -157,14 +158,15 @@ func Test_processContainer(t *testing.T) {
Data: mustMarshalJSON(t, Container{
Capacity: ContainerCapacity{
CPU: 2,
Memory: 2000,
Memory: 2048,
},
}),
},
},
wantU: resourceUnits{
CRU: 2,
MRU: 2,
MRU: 2 * gib,
SRU: 256 * mib,
},
},
}
Expand Down Expand Up @@ -203,8 +205,8 @@ func Test_processKubernetes(t *testing.T) {
},
wantU: resourceUnits{
CRU: 1,
MRU: 2,
SRU: 50,
MRU: 2 * gib,
SRU: 50 * gib,
},
},
{
Expand All @@ -219,8 +221,8 @@ func Test_processKubernetes(t *testing.T) {
},
wantU: resourceUnits{
CRU: 2,
MRU: 4,
SRU: 100,
MRU: 4 * gib,
SRU: 100 * gib,
},
},
}
Expand Down
Loading

0 comments on commit 2f3b5a0

Please sign in to comment.