mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	Add list of pods that use a volume to multiattach events
So users knows what pods are blocking a volume and can realize their error.
This commit is contained in:
		@@ -105,6 +105,10 @@ type DesiredStateOfWorld interface {
 | 
			
		||||
	// Mark multiattach error as reported to prevent spamming multiple
 | 
			
		||||
	// events for same error
 | 
			
		||||
	SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName)
 | 
			
		||||
 | 
			
		||||
	// GetPodsOnNodes returns list of pods ("namespace/name") that require
 | 
			
		||||
	// given volume on given nodes.
 | 
			
		||||
	GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// VolumeToAttach represents a volume that should be attached to a node.
 | 
			
		||||
@@ -412,3 +416,24 @@ func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
 | 
			
		||||
	}
 | 
			
		||||
	return pods
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dsw *desiredStateOfWorld) GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod {
 | 
			
		||||
	dsw.RLock()
 | 
			
		||||
	defer dsw.RUnlock()
 | 
			
		||||
 | 
			
		||||
	pods := []*v1.Pod{}
 | 
			
		||||
	for _, nodeName := range nodes {
 | 
			
		||||
		node, ok := dsw.nodesManaged[nodeName]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		volume, ok := node.volumesToAttach[volumeName]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		for _, pod := range volume.scheduledPods {
 | 
			
		||||
			pods = append(pods, pod.podObj)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return pods
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1032,3 +1032,49 @@ func verifyVolumeToAttach(
 | 
			
		||||
 | 
			
		||||
	t.Fatalf("volumesToAttach (%v) should contain %q/%q. It does not.", volumesToAttach, expectedVolumeName, expectedNodeName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_GetPodsOnNodes(t *testing.T) {
 | 
			
		||||
	volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
 | 
			
		||||
	dsw := NewDesiredStateOfWorld(volumePluginMgr)
 | 
			
		||||
 | 
			
		||||
	// 2 nodes, each with one pod with a different volume
 | 
			
		||||
	node1Name := k8stypes.NodeName("node1-name")
 | 
			
		||||
	pod1Name := "pod1-uid"
 | 
			
		||||
	volume1Name := v1.UniqueVolumeName("volume1-name")
 | 
			
		||||
	volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
 | 
			
		||||
	dsw.AddNode(node1Name, false /*keepTerminatedPodVolumes*/)
 | 
			
		||||
	generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
 | 
			
		||||
	if podAddErr != nil {
 | 
			
		||||
		t.Fatalf(
 | 
			
		||||
			"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
 | 
			
		||||
			pod1Name,
 | 
			
		||||
			podAddErr)
 | 
			
		||||
	}
 | 
			
		||||
	node2Name := k8stypes.NodeName("node2-name")
 | 
			
		||||
	pod2Name := "pod2-uid"
 | 
			
		||||
	volume2Name := v1.UniqueVolumeName("volume2-name")
 | 
			
		||||
	volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
 | 
			
		||||
	dsw.AddNode(node2Name, false /*keepTerminatedPodVolumes*/)
 | 
			
		||||
	_, podAddErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name)
 | 
			
		||||
	if podAddErr != nil {
 | 
			
		||||
		t.Fatalf(
 | 
			
		||||
			"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
 | 
			
		||||
			pod2Name,
 | 
			
		||||
			podAddErr)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Third node without any pod
 | 
			
		||||
	node3Name := k8stypes.NodeName("node3-name")
 | 
			
		||||
	dsw.AddNode(node3Name, false /*keepTerminatedPodVolumes*/)
 | 
			
		||||
 | 
			
		||||
	// Act
 | 
			
		||||
	pods := dsw.GetVolumePodsOnNodes([]k8stypes.NodeName{node1Name, node2Name, node3Name, "non-existing-node"}, generatedVolume1Name)
 | 
			
		||||
 | 
			
		||||
	// Assert
 | 
			
		||||
	if len(pods) != 1 {
 | 
			
		||||
		t.Fatalf("Expected 1 pod, got %d", len(pods))
 | 
			
		||||
	}
 | 
			
		||||
	if pods[0].Name != pod1Name {
 | 
			
		||||
		t.Errorf("Expected pod %s/%s, got %s", pod1Name, pod1Name, pods[0].Name)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -19,6 +19,7 @@ go_library(
 | 
			
		||||
        "//pkg/volume/util/operationexecutor:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
@@ -34,6 +35,7 @@ go_test(
 | 
			
		||||
        "//pkg/controller/volume/attachdetach/cache:go_default_library",
 | 
			
		||||
        "//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
 | 
			
		||||
        "//pkg/controller/volume/attachdetach/testing:go_default_library",
 | 
			
		||||
        "//pkg/util/strings:go_default_library",
 | 
			
		||||
        "//pkg/volume/testing:go_default_library",
 | 
			
		||||
        "//pkg/volume/util/operationexecutor:go_default_library",
 | 
			
		||||
        "//pkg/volume/util/types:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -21,10 +21,12 @@ package reconciler
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
 | 
			
		||||
@@ -269,12 +271,8 @@ func (rc *reconciler) attachDesiredVolumes() {
 | 
			
		||||
			nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
 | 
			
		||||
			if len(nodes) > 0 {
 | 
			
		||||
				if !volumeToAttach.MultiAttachErrorReported {
 | 
			
		||||
					simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
 | 
			
		||||
					for _, pod := range volumeToAttach.ScheduledPods {
 | 
			
		||||
						rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
 | 
			
		||||
					}
 | 
			
		||||
					rc.reportMultiAttachError(volumeToAttach, nodes)
 | 
			
		||||
					rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
 | 
			
		||||
					glog.Warningf(detailedMsg)
 | 
			
		||||
				}
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
@@ -292,5 +290,78 @@ func (rc *reconciler) attachDesiredVolumes() {
 | 
			
		||||
			glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// reportMultiAttachError sends events and logs situation that a volume that
 | 
			
		||||
// should be attached to a node is already attached to different node(s).
 | 
			
		||||
func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) {
 | 
			
		||||
	// Filter out the current node from list of nodes where the volume is
 | 
			
		||||
	// attached.
 | 
			
		||||
	// Some methods need []string, some other needs []NodeName, collect both.
 | 
			
		||||
	// In theory, these arrays should have always only one element - the
 | 
			
		||||
	// controller does not allow more than one attachment. But use array just
 | 
			
		||||
	// in case...
 | 
			
		||||
	otherNodes := []types.NodeName{}
 | 
			
		||||
	otherNodesStr := []string{}
 | 
			
		||||
	for _, node := range nodes {
 | 
			
		||||
		if node != volumeToAttach.NodeName {
 | 
			
		||||
			otherNodes = append(otherNodes, node)
 | 
			
		||||
			otherNodesStr = append(otherNodesStr, string(node))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Get list of pods that use the volume on the other nodes.
 | 
			
		||||
	pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)
 | 
			
		||||
 | 
			
		||||
	if len(pods) == 0 {
 | 
			
		||||
		// We did not find any pods that requests the volume. The pod must have been deleted already.
 | 
			
		||||
		simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
 | 
			
		||||
		for _, pod := range volumeToAttach.ScheduledPods {
 | 
			
		||||
			rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
 | 
			
		||||
		}
 | 
			
		||||
		// Log detailed message to system admin
 | 
			
		||||
		nodeList := strings.Join(otherNodesStr, ", ")
 | 
			
		||||
		detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already exclusively attached to node %s and can't be attached to another", nodeList))
 | 
			
		||||
		glog.Warningf(detailedMsg)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// There are pods that require the volume and run on another node. Typically
 | 
			
		||||
	// it's user error, e.g. a ReplicaSet uses a PVC and has >1 replicas. Let
 | 
			
		||||
	// the user know what pods are blocking the volume.
 | 
			
		||||
	for _, scheduledPod := range volumeToAttach.ScheduledPods {
 | 
			
		||||
		// Each scheduledPod must get a custom message. They can run in
 | 
			
		||||
		// different namespaces and user of a namespace should not see names of
 | 
			
		||||
		// pods in other namespaces.
 | 
			
		||||
		localPodNames := []string{} // Names of pods in scheduledPods's namespace
 | 
			
		||||
		otherPods := 0              // Count of pods in other namespaces
 | 
			
		||||
		for _, pod := range pods {
 | 
			
		||||
			if pod.Namespace == scheduledPod.Namespace {
 | 
			
		||||
				localPodNames = append(localPodNames, pod.Name)
 | 
			
		||||
			} else {
 | 
			
		||||
				otherPods++
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var msg string
 | 
			
		||||
		if len(localPodNames) > 0 {
 | 
			
		||||
			msg = fmt.Sprintf("Volume is already used by pod(s) %s", strings.Join(localPodNames, ", "))
 | 
			
		||||
			if otherPods > 0 {
 | 
			
		||||
				msg = fmt.Sprintf("%s and %d pod(s) in different namespaces", msg, otherPods)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			// No local pods, there are pods only in different namespaces.
 | 
			
		||||
			msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
 | 
			
		||||
		}
 | 
			
		||||
		simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
 | 
			
		||||
		rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Log all pods for system admin
 | 
			
		||||
	podNames := []string{}
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		podNames = append(podNames, pod.Namespace+"/"+pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", ")))
 | 
			
		||||
	glog.Warningf(detailedMsg)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -29,6 +29,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
 | 
			
		||||
	controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
 | 
			
		||||
	stringutil "k8s.io/kubernetes/pkg/util/strings"
 | 
			
		||||
	volumetesting "k8s.io/kubernetes/pkg/volume/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util/types"
 | 
			
		||||
@@ -531,6 +532,115 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
 | 
			
		||||
	waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_ReportMultiAttachError(t *testing.T) {
 | 
			
		||||
	type nodeWithPods struct {
 | 
			
		||||
		name     k8stypes.NodeName
 | 
			
		||||
		podNames []string
 | 
			
		||||
	}
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name           string
 | 
			
		||||
		nodes          []nodeWithPods
 | 
			
		||||
		expectedEvents []string
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			"no pods use the volume",
 | 
			
		||||
			[]nodeWithPods{
 | 
			
		||||
				{"node1", []string{"ns1/pod1"}},
 | 
			
		||||
			},
 | 
			
		||||
			[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already exclusively attached to one node and can't be attached to another"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			"pods in the same namespace use the volume",
 | 
			
		||||
			[]nodeWithPods{
 | 
			
		||||
				{"node1", []string{"ns1/pod1"}},
 | 
			
		||||
				{"node2", []string{"ns1/pod2"}},
 | 
			
		||||
			},
 | 
			
		||||
			[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod2"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			"pods in anotother namespace use the volume",
 | 
			
		||||
			[]nodeWithPods{
 | 
			
		||||
				{"node1", []string{"ns1/pod1"}},
 | 
			
		||||
				{"node2", []string{"ns2/pod2"}},
 | 
			
		||||
			},
 | 
			
		||||
			[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by 1 pod(s) in different namespaces"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			"pods both in the same and anotother namespace use the volume",
 | 
			
		||||
			[]nodeWithPods{
 | 
			
		||||
				{"node1", []string{"ns1/pod1"}},
 | 
			
		||||
				{"node2", []string{"ns2/pod2"}},
 | 
			
		||||
				{"node3", []string{"ns1/pod3"}},
 | 
			
		||||
			},
 | 
			
		||||
			[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod3 and 1 pod(s) in different namespaces"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		// Arrange
 | 
			
		||||
		t.Logf("Test %q starting", test.name)
 | 
			
		||||
		volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
 | 
			
		||||
		dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
 | 
			
		||||
		asw := cache.NewActualStateOfWorld(volumePluginMgr)
 | 
			
		||||
		fakeKubeClient := controllervolumetesting.CreateTestClient()
 | 
			
		||||
		fakeRecorder := record.NewFakeRecorder(100)
 | 
			
		||||
		fakeHandler := volumetesting.NewBlockVolumePathHandler()
 | 
			
		||||
		ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
 | 
			
		||||
			fakeKubeClient,
 | 
			
		||||
			volumePluginMgr,
 | 
			
		||||
			fakeRecorder,
 | 
			
		||||
			false, /* checkNodeCapabilitiesBeforeMount */
 | 
			
		||||
			fakeHandler))
 | 
			
		||||
		nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
 | 
			
		||||
		rc := NewReconciler(
 | 
			
		||||
			reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
 | 
			
		||||
 | 
			
		||||
		nodes := []k8stypes.NodeName{}
 | 
			
		||||
		for _, n := range test.nodes {
 | 
			
		||||
			dsw.AddNode(n.name, false /*keepTerminatedPodVolumes*/)
 | 
			
		||||
			nodes = append(nodes, n.name)
 | 
			
		||||
			for _, podName := range n.podNames {
 | 
			
		||||
				volumeName := v1.UniqueVolumeName("volume-name")
 | 
			
		||||
				volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
 | 
			
		||||
				volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
 | 
			
		||||
				uid := string(n.name) + "-" + podName // unique UID
 | 
			
		||||
				namespace, name := stringutil.SplitQualifiedName(podName)
 | 
			
		||||
				pod := controllervolumetesting.NewPod(uid, name)
 | 
			
		||||
				pod.Namespace = namespace
 | 
			
		||||
				_, err := dsw.AddPod(types.UniquePodName(uid), pod, volumeSpec, n.name)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					t.Fatalf("Error adding pod %s to DSW: %s", podName, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// Act
 | 
			
		||||
		volumes := dsw.GetVolumesToAttach()
 | 
			
		||||
		for _, vol := range volumes {
 | 
			
		||||
			if vol.NodeName == "node1" {
 | 
			
		||||
				rc.(*reconciler).reportMultiAttachError(vol, nodes)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Assert
 | 
			
		||||
		close(fakeRecorder.Events)
 | 
			
		||||
		index := 0
 | 
			
		||||
		for event := range fakeRecorder.Events {
 | 
			
		||||
			if len(test.expectedEvents) < index {
 | 
			
		||||
				t.Errorf("Test %q: unexpected event received: %s", test.name, event)
 | 
			
		||||
			} else {
 | 
			
		||||
				expectedEvent := test.expectedEvents[index]
 | 
			
		||||
				if expectedEvent != event {
 | 
			
		||||
					t.Errorf("Test %q: event %d: expected %q, got %q", test.name, index, expectedEvent, event)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			index++
 | 
			
		||||
		}
 | 
			
		||||
		for i := index; i < len(test.expectedEvents); i++ {
 | 
			
		||||
			t.Errorf("Test %q: event %d: expected %q, got none", test.name, i, test.expectedEvents[i])
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func waitForMultiAttachErrorOnNode(
 | 
			
		||||
	t *testing.T,
 | 
			
		||||
	attachedNode k8stypes.NodeName,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user