Skip to content

Commit

Permalink
Optimise logic to add BDs to a RaidGroup in a CSPC to maximise succes…
Browse files Browse the repository at this point in the history
…sful pool provisions (#136)

* Refactor logic to add BDs to a RaidGroup in a CSPC
* Add unit-tests for RaidZ & RaidZ2 pool-type
* Add capacity flag for CSPC generation
* Make RaidZ2 accept minimum 6 disks per node
* LinkedList+Sliding window logic to for non-stripe CSPC generation
( _kudos to Sai for coming up with all possible edge cases and reviewing this huge feature_ )
* Implement the LinkedList approach for iteratively selecting BDs

TIL:
* Fixed failing unit test
- Same BD was re-used in subsequent RAID-group
- Reason: Assignment to the method receivers propagates
  only to callees but not to callers.
* Remove unused variable `index`
* Allow 0 bytes as minimum disk size in Disk Selection algo
* Make the DeviceList contain a pointer to BD


Co-Authored-by: Sai Chaithanya (@mittachaitu) <[email protected]>
Signed-off-by: Harsh Vardhan <[email protected]>
  • Loading branch information
Harsh Vardhan and Sai Chaithanya (@mittachaitu) authored Dec 28, 2021
1 parent c55d1b6 commit c28dff9
Show file tree
Hide file tree
Showing 7 changed files with 608 additions and 54 deletions.
7 changes: 5 additions & 2 deletions cmd/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,27 @@ func NewCmdGenerate() *cobra.Command {
// should be renamed appropriately, as of now it made no sense to generically
// state pools when other pools aren't supported.
func NewCmdGenerateCStorStoragePoolCluster() *cobra.Command {
var nodes, raidType string
var nodes, raidType, cap string
var devices int
cmd := &cobra.Command{
Use: "cspc",
Short: "Generates cspc resources YAML/configuration which can be used to provision cStor storage pool clusters",
Run: func(cmd *cobra.Command, args []string) {
node, _ := cmd.Flags().GetString("nodes")
raid, _ := cmd.Flags().GetString("raidtype")
capacity, _ := cmd.Flags().GetString("capacity")
devs := numDevices(cmd)
nodeList := strings.Split(node, ",")
util.CheckErr(generate.CSPC(nodeList, devs, raid), util.Fatal)
util.CheckErr(generate.CSPC(nodeList, devs, raid, capacity), util.Fatal)
},
}
cmd.PersistentFlags().StringVarP(&nodes, "nodes", "", "",
"comma separated set of nodes for pool creation --nodes=node1,node2,node3,node4")
_ = cmd.MarkPersistentFlagRequired("nodes")
cmd.PersistentFlags().StringVarP(&raidType, "raidtype", "", "stripe",
"allowed RAID configuration such as, stripe, mirror, raid, raidz2")
cmd.PersistentFlags().StringVarP(&cap, "capacity", "", "10Gi",
"minimum capacity of the blockdevices to pick up for pool creation")
cmd.PersistentFlags().IntVar(&devices, "number-of-devices", 1, "number of devices per node, selects default based on raid-type")
return cmd
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func homeDir() string {
func (k K8sClient) GetOpenEBSNamespace(casType string) (string, error) {
pods, err := k.K8sCS.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{FieldSelector: "status.phase=Running", LabelSelector: fmt.Sprintf("openebs.io/component-name=%s", util.CasTypeAndComponentNameMap[strings.ToLower(casType)])})
if err != nil || len(pods.Items) == 0 {
return "", errors.New("unable to determine openebs namespace")
return "", fmt.Errorf("unable to determine openebs namespace, err: %v", err)
}
return pods.Items[0].Namespace, nil
}
Expand Down
116 changes: 86 additions & 30 deletions pkg/generate/cspc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package generate

import (
"fmt"
"sort"
"strconv"
"strings"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/openebs/api/v2/pkg/apis/openebs.io/v1alpha1"
"github.com/openebs/openebsctl/pkg/client"
"github.com/openebs/openebsctl/pkg/util"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -38,13 +40,20 @@ func isPoolTypeValid(raid string) bool {
}

// CSPC calls the generate routine for different cas-types
func CSPC(nodes []string, devs int, raid string) error {
func CSPC(nodes []string, devs int, raid, capacity string) error {
c := client.NewK8sClient()
if !isPoolTypeValid(strings.ToLower(raid)) {
// TODO: Use the well defined pool constant types from openebs/api when added there
return fmt.Errorf("invalid pool type %s", raid)
}
_, str, err := cspc(c, nodes, devs, strings.ToLower(raid))
// resource.Quantity doesn't like the bits or bytes suffixes
capacity = strings.Replace(capacity, "b", "", 1)
capacity = strings.Replace(capacity, "B", "", 1)
size, err := resource.ParseQuantity(capacity)
if err != nil {
return err
}
_, str, err := cspc(c, nodes, devs, strings.ToLower(raid), size)
if err != nil {
return err
}
Expand All @@ -53,14 +62,24 @@ func CSPC(nodes []string, devs int, raid string) error {
}

// cspc takes eligible nodes, number of devices and poolType to create a pool cluster template
func cspc(c *client.K8sClient, nodes []string, devs int, poolType string) (*cstorv1.CStorPoolCluster, string, error) {
func cspc(c *client.K8sClient, nodes []string, devs int, poolType string, minSize resource.Quantity) (*cstorv1.CStorPoolCluster, string, error) {
// 0. Figure out the OPENEBS_NAMESPACE for CStor
cstorNS, err := c.GetOpenEBSNamespace(util.CstorCasType)
// assume CSTOR's OPENEBS_NAMESPACE has all the relevant blockdevices
c.Ns = cstorNS
if err != nil {
return nil, "", fmt.Errorf("unable to determine the cStor namespace error: %v", err)
}
// 0.1 Validate user input, check if user hasn't entered less than 64Mi
cstorMin := resource.MustParse("64Mi")
if minSize.Cmp(cstorMin) < 0 {
return nil, "", fmt.Errorf("minimum size of supported block-devices in a cspc is 64Mi")
}
// 0.2 Validate user input, check if user has entered >= minimum supported BD-count
if min := minCount()[poolType]; devs < min {
return nil, "", fmt.Errorf("%s pool requires a minimum of %d block device per node",
poolType, min)
}
// 1. Validate nodes & poolType, fetch disks
nodeList, err := c.GetNodes(nodes, "", "")
if err != nil {
Expand All @@ -82,7 +101,7 @@ func cspc(c *client.K8sClient, nodes []string, devs int, poolType string) (*csto
if err != nil || len(bds.Items) == 0 {
return nil, "", fmt.Errorf("no blockdevices found in nodes with %v hostnames", hostnames)
}
_, err = filterCStorCompatible(bds)
_, err = filterCStorCompatible(bds, minSize)
if err != nil {
return nil, "", fmt.Errorf("(server error) unable to fetch bds from %v nodes", nodes)
}
Expand All @@ -92,7 +111,7 @@ func cspc(c *client.K8sClient, nodes []string, devs int, poolType string) (*csto
hostToBD[bd.Labels["kubernetes.io/hostname"]] = append(hostToBD[bd.Labels["kubernetes.io/hostname"]], bd)
}
// 4. Select disks and create the PoolSpec
p, err := makePools(poolType, devs, hostToBD, nodes, hostnames)
p, err := makePools(poolType, devs, hostToBD, nodes, hostnames, minSize)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -145,11 +164,13 @@ func getBDComment(name string, bdList *v1alpha1.BlockDeviceList) string {

// makePools creates a poolSpec based on the poolType, number of devices per
// pool instance and a collection of blockdevices by nodes
func makePools(poolType string, nDevices int, bd map[string][]v1alpha1.BlockDevice, nodes []string, hosts []string) (*[]cstorv1.PoolSpec, error) {
func makePools(poolType string, nDevices int, bd map[string][]v1alpha1.BlockDevice,
nodes []string, hosts []string, minsize resource.Quantity) (*[]cstorv1.PoolSpec, error) {
// IMPORTANT: User is more likely to see the nodeNames, so the errors
// should preferably be shown in terms of nodeNames and not hostNames
var spec []cstorv1.PoolSpec
if poolType == string(cstorv1.PoolStriped) {
switch poolType {
case string(cstorv1.PoolStriped):
// always single RAID-group with nDevices patched together, cannot disk replace,
// no redundancy in a pool, redundancy possible across pool instances

Expand All @@ -168,7 +189,6 @@ func makePools(poolType string, nDevices int, bd map[string][]v1alpha1.BlockDevi
return nil, fmt.Errorf("not enough blockdevices found on node %s, want %d, found %d", nodes[i], nDevices, len(bds))
}
var raids []cstorv1.CStorPoolInstanceBlockDevice

for d := 0; d < nDevices; d++ {
raids = append(raids, cstorv1.CStorPoolInstanceBlockDevice{BlockDeviceName: bds[d].Name})
}
Expand All @@ -181,48 +201,84 @@ func makePools(poolType string, nDevices int, bd map[string][]v1alpha1.BlockDevi
})
}
return &spec, nil
} else if poolType == string(cstorv1.PoolMirrored) {
if nDevices%2 != 0 {
return nil, fmt.Errorf("mirrored pool requires multiples of two block device")
case string(cstorv1.PoolMirrored), string(cstorv1.PoolRaidz), string(cstorv1.PoolRaidz2):
min := minCount()[poolType]
if nDevices%min != 0 {
// there must be min number of devices per RaidGroup
return nil, fmt.Errorf("number of devices must be a multiple of %d", min)
}
if min > nDevices {
return nil, fmt.Errorf("insufficient blockdevices require minimum %d devices for %s", min, poolType)
}
// 1. Start filling in the devices in their RAID-groups per the hostnames
for i, host := range hosts {
var raidGroups []cstorv1.RaidGroup
// add all BDs to a CSPCs CSPI spec
bds := bd[host]
if len(bds) < nDevices {
return nil, fmt.Errorf("not enough eligible blockdevices found on node %s, want %d, found %d", nodes[i], nDevices, len(bds))
}
index := 0
for d := 0; d < nDevices/2; d++ {
raids := []cstorv1.CStorPoolInstanceBlockDevice{{BlockDeviceName: bds[index].Name},
{BlockDeviceName: bds[index+1].Name}}
// 1. sort the BDs by increasing order
sort.Slice(bds, func(i, j int) bool {
// sort by increasing order
return bds[i].Spec.Capacity.Storage < bds[j].Spec.Capacity.Storage
})
// 2. Check if close to the desired capacity of the pool can be achieved by minimising disk wastage
// 3. Suggest the start and end index for the BDs to be used for the raid group
maxIndex := len(bds)
if maxIndex < nDevices {
return nil, fmt.Errorf("not enough eligible blockdevices found on node %s, want %d, found %d", nodes[i], min, maxIndex)
}
devices := Generate(v1alpha1.BlockDeviceList{Items: bds})
for d := 0; d < nDevices/min; d++ {
var raids []cstorv1.CStorPoolInstanceBlockDevice
d, thisRaidGroup, err := devices.Select(minsize, min)
// re-assign the head node of the linked-list for next iteration
// pinning the new head to the variable declared above for upcoming usage as required
devices = d
if err != nil {
return nil, err
}
for j := 0; j < min; j++ {
// each RaidGroup has min number of devices
raids = append(raids, cstorv1.CStorPoolInstanceBlockDevice{BlockDeviceName: thisRaidGroup[j].ObjectMeta.Name})
}
raidGroups = append(raidGroups, cstorv1.RaidGroup{CStorPoolInstanceBlockDevices: raids})
index += 2
}
// add the CSPI BD spec inside cspc to a PoolSpec
spec = append(spec, cstorv1.PoolSpec{
NodeSelector: map[string]string{"kubernetes.io/hostname": host},
DataRaidGroups: raidGroups,
PoolConfig: cstorv1.PoolConfig{
DataRaidGroupType: string(cstorv1.PoolMirrored),
DataRaidGroupType: poolType,
},
})
}
return &spec, nil
// 2ⁿ devices per RaidGroup, (confirm) not more than 2 devices per RaidGroup
// DOUBT: Should this throw an error if nDevices isn't 2ⁿ?
} else if poolType == string(cstorv1.PoolRaidz) {
return nil, fmt.Errorf("%s is not supported yet", poolType)
// 2ⁿ+1 devices per RaidGroup
} else if poolType == string(cstorv1.PoolRaidz2) {
return nil, fmt.Errorf("%s is not supported yet", poolType)
// 2ⁿ+2 devices per RaidGroup
}
return nil, fmt.Errorf("unknown pool-type")
default:
return nil, fmt.Errorf("unknown pool-type %s", poolType)
}
}

// minCount states the minimum number of BDs for a pool type in a RAID-group
// this is an example of an immutable map
func minCount() map[string]int {
return map[string]int{
string(cstorv1.PoolStriped): 1,
// mirror: data is mirrored across even no of disks
string(cstorv1.PoolMirrored): 2,
// raidz: data is spread across even no of disks and one disk is for parity^
// ^recovery information, metadata, etc
// can handle one device failing
string(cstorv1.PoolRaidz): 3,
// raidz2: data is spread across even no of disks and two disks are for parity
// can handle two devices failing
string(cstorv1.PoolRaidz2): 6,
}
}

// filterCStorCompatible takes a list of BDs and gives out a list of BDs which can be used to provision a pool
func filterCStorCompatible(bds *v1alpha1.BlockDeviceList) (*v1alpha1.BlockDeviceList, error) {
func filterCStorCompatible(bds *v1alpha1.BlockDeviceList, minLimit resource.Quantity) (*v1alpha1.BlockDeviceList, error) {
// TODO: Optionally reject sparse-disks depending on configs
var final []v1alpha1.BlockDevice
for _, bd := range bds.Items {
Expand All @@ -231,13 +287,13 @@ func filterCStorCompatible(bds *v1alpha1.BlockDeviceList) (*v1alpha1.BlockDevice
bd.Status.ClaimState == v1alpha1.BlockDeviceUnclaimed &&
bd.Spec.FileSystem.Type == "" &&
// BD's capacity >=64 MiB
bd.Spec.Capacity.Storage >= 67110000 {
bd.Spec.Capacity.Storage >= uint64(minLimit.Value()) {
final = append(final, bd)
}
}
bds.Items = final
if len(final) == 0 {
return nil, fmt.Errorf("found no eligble blockdevices")
return nil, fmt.Errorf("found no eligble blockdevices of size %s", minLimit.String())
}
return bds, nil
}
Loading

0 comments on commit c28dff9

Please sign in to comment.