Skip to content

Commit

Permalink
Don't track reattachment of provider-unrelated PVs (#937)
Browse files Browse the repository at this point in the history
* Don't track reattachment of provider-unrelated PVs

* Add unit tests for `getPodVolumeInfos`

* Review: simplify `addAll`, drop `toListOfAny`
  • Loading branch information
timebertt authored Oct 18, 2024
1 parent 97cd752 commit 67e4579
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 76 deletions.
187 changes: 116 additions & 71 deletions pkg/util/provider/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"time"

"github.com/Masterminds/semver/v3"
"github.com/gardener/machine-controller-manager/pkg/util/k8sutils"
"github.com/gardener/machine-controller-manager/pkg/util/provider/driver"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
Expand All @@ -49,6 +47,9 @@ import (
policyv1listers "k8s.io/client-go/listers/policy/v1"
policyv1beta1listers "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/klog/v2"

"github.com/gardener/machine-controller-manager/pkg/util/k8sutils"
"github.com/gardener/machine-controller-manager/pkg/util/provider/driver"
)

// Options are configurable options while draining a node before deletion
Expand Down Expand Up @@ -90,11 +91,36 @@ type fatal struct {
string
}

// PodVolumeInfo is the struct used to hold the PersistentVolumeID and volumeID
// for all the PVs attached to the pod
// PodVolumeInfo holds a list of infos about PersistentVolumes referenced by a single pod.
type PodVolumeInfo struct {
persistentVolumeList []string
volumeList []string
// volumes is the list of infos about all PersistentVolumes referenced by a pod via PersistentVolumeClaims.
volumes []VolumeInfo
}

// PersistentVolumeNames returns the names of all PersistentVolumes used by the pod.
func (p PodVolumeInfo) PersistentVolumeNames() []string {
out := make([]string, 0, len(p.volumes))
for _, volume := range p.volumes {
out = append(out, volume.persistentVolumeName)
}
return out
}

// VolumeIDs returns the volume IDs/handles of all PersistentVolumes used by the pod.
func (p PodVolumeInfo) VolumeIDs() []string {
out := make([]string, 0, len(p.volumes))
for _, volume := range p.volumes {
out = append(out, volume.volumeID)
}
return out
}

// VolumeInfo holds relevant information about a PersistentVolume for tracking attachments.
type VolumeInfo struct {
// The name of the PersistentVolume referenced by PersistentVolumeClaim used in a Pod.
persistentVolumeName string
// The volume ID/handle corresponding to the PersistentVolume name as return by Driver.GetVolumeIDs.
volumeID string
}

const (
Expand Down Expand Up @@ -505,40 +531,51 @@ func sortPodsByPriority(pods []*corev1.Pod) {
})
}

// doAccountingOfPvs returns a map with the key as a hash of
// pod-namespace/pod-name and value as a PodVolumeInfo object
func (o *Options) doAccountingOfPvs(ctx context.Context, pods []*corev1.Pod) map[string]PodVolumeInfo {
// getPodVolumeInfos returns information about all PersistentVolumes of which the machine controller needs to track
// attachments when draining a node.
// It filters out shared PVs (used by multiple pods).
// Also, when mapping PersistentVolume names to volume IDs, the driver filters out volumes of types that don't belong
// to the respective cloud provider. E.g., this filters out CSI volumes of unrelated drivers and NFS volumes, etc.
func (o *Options) getPodVolumeInfos(ctx context.Context, pods []*corev1.Pod) map[string]PodVolumeInfo {
var (
pvMap = make(map[string][]string)
podVolumeInfoMap = make(map[string]PodVolumeInfo)
persistentVolumeNamesByPod = make(map[string][]string)
podVolumeInfos = make(map[string]PodVolumeInfo)
)

for _, pod := range pods {
podPVs, _ := o.getPVList(pod)
pvMap[getPodKey(pod)] = podPVs
persistentVolumeNamesByPod[getPodKey(pod)] = o.getPersistentVolumeNamesForPod(pod)
}

// Filter the list of shared PVs
filterSharedPVs(pvMap)
filterSharedPVs(persistentVolumeNamesByPod)

for podKey, persistentVolumeList := range pvMap {
persistentVolumeListDeepCopy := persistentVolumeList
volumeList, err := o.getVolIDsFromDriver(ctx, persistentVolumeList)
if err != nil {
// In case of error, log and skip this set of volumes
klog.Errorf("error getting volume ID from cloud provider. Skipping volumes for pod: %v. Err: %v", podKey, err)
continue
}
for podKey, persistentVolumeNames := range persistentVolumeNamesByPod {
podVolumeInfo := PodVolumeInfo{}

podVolumeInfo := PodVolumeInfo{
persistentVolumeList: persistentVolumeListDeepCopy,
volumeList: volumeList,
for _, persistentVolumeName := range persistentVolumeNames {
volumeID, err := o.getVolumeIDFromDriver(ctx, persistentVolumeName)
if err != nil {
// In case of error, log and skip this set of volumes
klog.Errorf("error getting volume ID from cloud provider. Skipping volume %s for pod: %v. Err: %v", persistentVolumeName, podKey, err)
continue
}

// Only if the driver returns a volume ID for this PV, we want to track its attachment during drain operations.
if volumeID != "" {
podVolumeInfo.volumes = append(podVolumeInfo.volumes, VolumeInfo{
persistentVolumeName: persistentVolumeName,
volumeID: volumeID,
})
} else {
klog.V(4).Infof("Driver did not return a volume ID for volume %s. Skipping provider-unrelated volume for pod %s", persistentVolumeName, podKey)
}
}
podVolumeInfoMap[podKey] = podVolumeInfo

podVolumeInfos[podKey] = podVolumeInfo
}
klog.V(4).Infof("PodVolumeInfoMap = %v", podVolumeInfoMap)
klog.V(4).Infof("PodVolumeInfos: %v", podVolumeInfos)

return podVolumeInfoMap
return podVolumeInfos
}

// filterSharedPVs filters out the PVs that are shared among pods.
Expand Down Expand Up @@ -585,7 +622,7 @@ func (o *Options) evictPodsWithPv(ctx context.Context, attemptEvict bool, pods [
) {
sortPodsByPriority(pods)

podVolumeInfoMap := o.doAccountingOfPvs(ctx, pods)
podVolumeInfoMap := o.getPodVolumeInfos(ctx, pods)

var (
remainingPods []*corev1.Pod
Expand All @@ -597,7 +634,7 @@ func (o *Options) evictPodsWithPv(ctx context.Context, attemptEvict bool, pods [
for i := 0; i < nretries; i++ {
remainingPods, fastTrack = o.evictPodsWithPVInternal(ctx, attemptEvict, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh)
if fastTrack || len(remainingPods) == 0 {
//Either all pods got evicted or we need to fast track the return (node deletion detected)
// Either all pods got evicted or we need to fast track the return (node deletion detected)
break
}

Expand Down Expand Up @@ -768,7 +805,7 @@ func (o *Options) evictPodsWithPVInternal(
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
}
klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.persistentVolumeList)
klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.volumes)
}

o.checkAndDeleteWorker(volumeAttachmentEventCh)
Expand All @@ -786,8 +823,9 @@ func (o *Options) evictPodsWithPVInternal(
return retryPods, false
}

func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) {
pvs := []string{}
func (o *Options) getPersistentVolumeNamesForPod(pod *corev1.Pod) []string {
var pvs []string

for i := range pod.Spec.Volumes {
vol := &pod.Spec.Volumes[i]

Expand Down Expand Up @@ -819,17 +857,18 @@ func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) {
}
}
}
return pvs, nil

return pvs
}

func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo, nodeName string) error {
if len(podVolumeInfo.volumeList) == 0 || nodeName == "" {
if len(podVolumeInfo.volumes) == 0 || nodeName == "" {
// If volume or node name is not available, nothing to do. Just log this as warning
klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumeList)
klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumes)
return nil
}

klog.V(4).Info("Waiting for following volumes to detach: ", podVolumeInfo.volumeList)
klog.V(3).Infof("Waiting for following volumes to detach: %v", podVolumeInfo.volumes)

found := true

Expand All @@ -852,15 +891,15 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo
return err
}

klog.V(4).Infof("No of attached volumes for node %q is %s", nodeName, node.Status.VolumesAttached)
klog.V(4).Infof("Volumes attached to node %q: %s", nodeName, node.Status.VolumesAttached)
attachedVols := node.Status.VolumesAttached
if len(attachedVols) == 0 {
klog.V(4).Infof("No volumes attached to the node %q", nodeName)
return nil
}

LookUpVolume:
for _, volumeID := range podVolumeInfo.volumeList {
for _, volumeID := range podVolumeInfo.VolumeIDs() {

for j := range attachedVols {
attachedVol := &attachedVols[j]
Expand All @@ -869,7 +908,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo

if found {
klog.V(4).Infof(
"Found volume:%s still attached to node %q. Will re-check in %s",
"Found volume %q still attached to node %q. Will re-check in %s",
volumeID,
nodeName,
VolumeDetachPollInterval,
Expand All @@ -881,7 +920,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo
}
}

klog.V(4).Infof("Detached volumes:%s from node %q", podVolumeInfo.volumeList, nodeName)
klog.V(3).Infof("Detached volumes %v from node %q", podVolumeInfo.volumes, nodeName)
return nil
}

Expand All @@ -899,18 +938,18 @@ func isDesiredReattachment(volumeAttachment *storagev1.VolumeAttachment, previou
// 1. If CSI is enabled use determine reattach
// 2. If all else fails, fallback to static timeout
func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeInfo, previousNodeName string, volumeAttachmentEventCh chan *storagev1.VolumeAttachment) error {
if len(podVolumeInfo.persistentVolumeList) == 0 || previousNodeName == "" {
if len(podVolumeInfo.volumes) == 0 || previousNodeName == "" {
// If volume or node name is not available, nothing to do. Just log this as warning
klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.persistentVolumeList)
klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.volumes)
return nil
}

klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.persistentVolumeList)
klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.volumes)

var pvsWaitingForReattachments map[string]bool
if volumeAttachmentEventCh != nil {
pvsWaitingForReattachments = make(map[string]bool)
for _, persistentVolumeName := range podVolumeInfo.persistentVolumeList {
for _, persistentVolumeName := range podVolumeInfo.PersistentVolumeNames() {
pvsWaitingForReattachments[persistentVolumeName] = true
}
}
Expand All @@ -923,7 +962,7 @@ func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeIn

case <-ctx.Done():
// Timeout occurred waiting for reattachment, exit function with error
klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.persistentVolumeList)
klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.volumes)
return fmt.Errorf("%s", reattachTimeoutErr)

case incomingEvent := <-volumeAttachmentEventCh:
Expand All @@ -945,39 +984,45 @@ func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeIn
}
}

klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.persistentVolumeList)
klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.volumes)
return nil
}

func (o *Options) getVolIDsFromDriver(ctx context.Context, pvNames []string) ([]string, error) {
pvSpecs := []*corev1.PersistentVolumeSpec{}
func (o *Options) getVolumeIDFromDriver(ctx context.Context, pvName string) (string, error) {
var pvSpec *corev1.PersistentVolumeSpec

for _, pvName := range pvNames {
try := 0
try := 0

for {
pv, err := o.pvLister.Get(pvName)
for {
pv, err := o.pvLister.Get(pvName)

if apierrors.IsNotFound(err) {
break
} else if err != nil {
try++
if try == GetPvDetailsMaxRetries {
break
}
// In case of error, try again after few seconds
time.Sleep(GetPvDetailsRetryInterval)
continue
if apierrors.IsNotFound(err) {
return "", nil
} else if err != nil {
try++
if try == GetPvDetailsMaxRetries {
return "", err
}

// Found PV; append and exit
pvSpecs = append(pvSpecs, &pv.Spec)
break
// In case of error, try again after few seconds
time.Sleep(GetPvDetailsRetryInterval)
continue
}

// found PV
pvSpec = &pv.Spec
break
}

response, err := o.Driver.GetVolumeIDs(ctx, &driver.GetVolumeIDsRequest{PVSpecs: []*corev1.PersistentVolumeSpec{pvSpec}})
if err != nil {
return "", err
}

response, err := o.Driver.GetVolumeIDs(ctx, &driver.GetVolumeIDsRequest{PVSpecs: pvSpecs})
return response.VolumeIDs, err
if len(response.VolumeIDs) > 0 {
return response.VolumeIDs[0], nil
}

return "", nil
}

func (o *Options) evictPodWithoutPVInternal(ctx context.Context, attemptEvict bool, pod *corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error), returnCh chan error) {
Expand Down Expand Up @@ -1076,8 +1121,8 @@ func (o *Options) waitForDelete(pods []*corev1.Pod, interval, timeout time.Durat
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
//cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr)
//klog.Info("pod deleted successfully found")
// cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr)
// klog.Info("pod deleted successfully found")
continue
} else if err != nil {
return false, err
Expand Down
Loading

0 comments on commit 67e4579

Please sign in to comment.