Skip to content

Commit

Permalink
switch to use state file for IP allocation pool management (#2110)
Browse files Browse the repository at this point in the history
* switch to use state file for IP allocation pool management

* add soak test suit for state file

* address comments

* add missing go.sum entries
  • Loading branch information
M00nF1sh authored Oct 24, 2022
1 parent 3dacc42 commit 7eeb2a9
Show file tree
Hide file tree
Showing 15 changed files with 1,125 additions and 205 deletions.
19 changes: 4 additions & 15 deletions cmd/routed-eni-cni-plugin/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package main

import (
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -216,15 +214,15 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
// Non-zero value means pods are using branch ENI
if r.PodVlanId != 0 {
hostVethNamePrefix := sgpp.BuildHostVethNamePrefix(conf.VethPrefix, conf.PodSGEnforcingMode)
hostVethName = generateHostVethName(hostVethNamePrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
hostVethName = networkutils.GeneratePodHostVethName(hostVethNamePrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
err = driverClient.SetupBranchENIPodNetwork(hostVethName, args.IfName, args.Netns, v4Addr, v6Addr, int(r.PodVlanId), r.PodENIMAC,
r.PodENISubnetGW, int(r.ParentIfIndex), mtu, conf.PodSGEnforcingMode, log)

// This is a dummyVlanInterfaceName generated to identify dummyVlanInterface
// which will be created for PPSG scenario to pass along the vlanId information
// as a part of the ADD cmd Result struct
// The podVlanId is used by DEL cmd, fetched from the prevResult struct to cleanup the pod network
dummyVlanInterfaceName := generateHostVethName(dummyVlanInterfacePrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
dummyVlanInterfaceName := networkutils.GeneratePodHostVethName(dummyVlanInterfacePrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))

// The dummyVlanInterface is purely virtual and relevent only for ppsg, so we decided to keep it separate
// and not overload the already available hostVethInterface
Expand All @@ -233,7 +231,7 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
} else {
// build hostVethName
// Note: the maximum length for linux interface name is 15
hostVethName = generateHostVethName(conf.VethPrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
hostVethName = networkutils.GeneratePodHostVethName(conf.VethPrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
err = driverClient.SetupPodNetwork(hostVethName, args.IfName, args.Netns, v4Addr, v6Addr, int(r.DeviceNumber), mtu, log)
}

Expand Down Expand Up @@ -290,15 +288,6 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
return cniTypes.PrintResult(result, conf.CNIVersion)
}

// generateHostVethName returns a name to be used on the host-side veth device.
// The veth name is generated such that it aligns with the value expected
// by Calico for NetworkPolicy enforcement.
func generateHostVethName(prefix, namespace, podname string) string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("%s.%s", namespace, podname)))
return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11])
}

func cmdDel(args *skel.CmdArgs) error {
return del(args, typeswrapper.New(), grpcwrapper.New(), rpcwrapper.New(), driver.New())
}
Expand Down Expand Up @@ -425,7 +414,7 @@ func tryDelWithPrevResult(driverClient driver.NetworkAPIs, conf *NetConf, k8sArg
return false, nil
}

dummyIfaceName := generateHostVethName(dummyVlanInterfacePrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
dummyIfaceName := networkutils.GeneratePodHostVethName(dummyVlanInterfacePrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
_, dummyIface, found := cniutils.FindInterfaceByName(prevResult.Interfaces, dummyIfaceName)
if !found {
return false, nil
Expand Down
80 changes: 62 additions & 18 deletions pkg/ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import (
"fmt"
"net"
"os"
"strings"
"sync"
"time"

"github.com/aws/amazon-vpc-cni-k8s/pkg/netlinkwrapper"
"github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils"
"github.com/vishvananda/netlink"

"github.com/aws/amazon-vpc-cni-k8s/pkg/cri"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/pkg/errors"
Expand Down Expand Up @@ -57,7 +62,7 @@ const (
// ipamd/datastore restarts. In vpc-cni <=1.6, we "stored" the
// allocated IPs by querying kubelet's CRI. Since this requires scary
// access to CRI socket, and may race with CRI's internal logic, we
// are transitioning away from from this to storing allocations
// are transitioning away from this to storing allocations
// ourself in a file (similar to host-ipam CNI plugin).
//
// Because we don't want to require a node restart during CNI
Expand All @@ -72,7 +77,7 @@ const (
// Note phase3 is not necessary since writes to CRI are implicit.
// At/after phase2, we can remove any code protected by
// checkpointMigrationPhase<2.
const checkpointMigrationPhase = 1
const checkpointMigrationPhase = 2

// Placeholders used for unknown values when reading from CRI.
const backfillNetworkName = "_migrated-from-cri"
Expand Down Expand Up @@ -138,7 +143,7 @@ type IPAMKey struct {
IfName string `json:"ifName"`
}

// IsZero returns true iff object is equal to the golang zero/null value.
// IsZero returns true if object is equal to the golang zero/null value.
func (k IPAMKey) IsZero() bool {
return k == IPAMKey{}
}
Expand Down Expand Up @@ -316,6 +321,7 @@ type DataStore struct {
CheckpointMigrationPhase int
backingStore Checkpointer
cri cri.APIs
netLink netlinkwrapper.NetLink
isPDEnabled bool
}

Expand Down Expand Up @@ -350,6 +356,7 @@ func NewDataStore(log logger.Logger, backingStore Checkpointer, isPDEnabled bool
log: log,
backingStore: backingStore,
cri: cri.New(),
netLink: netlinkwrapper.NewNetLink(),
CheckpointMigrationPhase: checkpointMigrationPhase,
isPDEnabled: isPDEnabled,
}
Expand Down Expand Up @@ -432,24 +439,25 @@ func (ds *DataStore) ReadBackingStore(isv6Enabled bool) error {

case 2:
// Phase2: Read from checkpoint file
ds.log.Infof("Reading ipam state from backing store")

err := ds.backingStore.Restore(&data)
ds.log.Debugf("backing store restore returned err %v", err)
if os.IsNotExist(err) {
// Assume that no file == no containers are
// currently in use, eg a fresh reboot just
// cleared everything out. This is ok, and a
// no-op.
return nil
} else if err != nil {
return fmt.Errorf("datastore: error reading backing store: %v", err)
ds.log.Infof("Begin ipam state recovery from backing store")

if err := ds.backingStore.Restore(&data); err != nil {
// Assume that no file == no containers are currently in use, e.g. a fresh reboot just cleared everything out.
// This is ok, and no-op.
if os.IsNotExist(err) {
ds.log.Debugf("backing store doesn't exists, assuming bootstrap on a new node")
return nil
}
return errors.Wrap(err, "failed ipam state recovery from backing store")
}

if data.Version != CheckpointFormatVersion {
return fmt.Errorf("datastore: unknown backing store format (%s != %s) - wrong CNI/ipamd version? (Rebooting this node will restart local pods and probably help)", data.Version, CheckpointFormatVersion)
return errors.Errorf("failed ipam state recovery due to unexpected checkpointVersion: %v/%v", data.Version, CheckpointFormatVersion)
}
if normalizedData, err := ds.normalizeCheckpointDataByPodVethExistence(data); err != nil {
return errors.Wrap(err, "failed normalize checkpoint data with veth check")
} else {
data = normalizedData
}

default:
panic(fmt.Sprintf("Unexpected value of checkpointMigrationPhase: %v", ds.CheckpointMigrationPhase))
}
Expand Down Expand Up @@ -1538,3 +1546,39 @@ func (ds *DataStore) CheckFreeableENIexists() bool {
}
return false
}

// NormalizeCheckpointDataByPodVethExistence will normalize checkpoint data by removing allocations that don't have a corresponding pod veth.
// This shouldn't happen unless container runtimes have bugs that failed to invoke the cmdDel for died pods.
// we use this reconciliation as a safety mechanism during transition from CRI to state file.
func (ds *DataStore) normalizeCheckpointDataByPodVethExistence(checkpoint CheckpointData) (CheckpointData, error) {
hostNSLinks, err := ds.netLink.LinkList()
if err != nil {
return CheckpointData{}, err
}
var validatedAllocations []CheckpointEntry
for _, allocation := range checkpoint.Allocations {
if err := ds.validateAllocationByPodVethExistence(allocation, hostNSLinks); err != nil {
ds.log.Warnf("ignore IP allocation for %v:%v,%v due to %v", allocation.ContainerID, allocation.IPv4, allocation.IPv6, err)
} else {
validatedAllocations = append(validatedAllocations, allocation)
}
}
checkpoint.Allocations = validatedAllocations
return checkpoint, nil
}

func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointEntry, hostNSLinks []netlink.Link) error {
// for backwards compatibility, we skip the validation when metadata contains empty namespace/name.
if allocation.Metadata.K8SPodNamespace == "" || allocation.Metadata.K8SPodName == "" {
return nil
}

linkNameSuffix := networkutils.GeneratePodHostVethNameSuffix(allocation.Metadata.K8SPodNamespace, allocation.Metadata.K8SPodName)
for _, link := range hostNSLinks {
linkName := link.Attrs().Name
if strings.HasSuffix(linkName, linkNameSuffix) {
return nil
}
}
return errors.Errorf("host-side veth not found for pod %v/%v", allocation.Metadata.K8SPodNamespace, allocation.Metadata.K8SPodName)
}
Loading

0 comments on commit 7eeb2a9

Please sign in to comment.