From 17be78065135ebb67b620217834d8e48b3d27194 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Fri, 19 Jul 2019 16:38:54 +0200 Subject: [PATCH] Add events to dswp --- .../cache/desired_state_of_world.go | 53 ++++++++++++++++++- .../desired_state_of_world_populator.go | 13 ++++- pkg/kubelet/volumemanager/volume_manager.go | 11 ++-- .../volumemanager/volume_manager_test.go | 14 +++-- 4 files changed, 83 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index 28b15b5f161..b4c746e07a9 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -24,8 +24,9 @@ import ( "fmt" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -107,6 +108,18 @@ type DesiredStateOfWorld interface { // If a pod with the same name does not exist under the specified // volume, false is returned. VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool + + // AddErrorToPod adds the given error to the given pod in the cache. + // It will be returned by subsequent GetPodErrors(). + // Each error string is stored only once. + AddErrorToPod(podName types.UniquePodName, err string) + + // PopPodErrors returns accumulated errors on a given pod and clears + // them. + PopPodErrors(podName types.UniquePodName) []string + + // GetPodsWithErrors returns names of pods that have stored errors. + GetPodsWithErrors() []types.UniquePodName } // VolumeToMount represents a volume that is attached to this node and needs to @@ -120,6 +133,7 @@ func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStat return &desiredStateOfWorld{ volumesToMount: make(map[v1.UniqueVolumeName]volumeToMount), volumePluginMgr: volumePluginMgr, + podErrors: make(map[types.UniquePodName]sets.String), } } @@ -132,6 +146,8 @@ type desiredStateOfWorld struct { // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr + // podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod. + podErrors map[types.UniquePodName]sets.String sync.RWMutex } @@ -293,6 +309,8 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume( dsw.Lock() defer dsw.Unlock() + delete(dsw.podErrors, podName) + volumeObj, volumeExists := dsw.volumesToMount[volumeName] if !volumeExists { return @@ -412,3 +430,36 @@ func (dsw *desiredStateOfWorld) isDeviceMountableVolume(volumeSpec *volume.Spec) return false } + +func (dsw *desiredStateOfWorld) AddErrorToPod(podName types.UniquePodName, err string) { + dsw.Lock() + defer dsw.Unlock() + + if errs, found := dsw.podErrors[podName]; found { + errs.Insert(err) + return + } + dsw.podErrors[podName] = sets.NewString(err) +} + +func (dsw *desiredStateOfWorld) PopPodErrors(podName types.UniquePodName) []string { + dsw.Lock() + defer dsw.Unlock() + + if errs, found := dsw.podErrors[podName]; found { + delete(dsw.podErrors, podName) + return errs.List() + } + return []string{} +} + +func (dsw *desiredStateOfWorld) GetPodsWithErrors() []types.UniquePodName { + dsw.RLock() + defer dsw.RUnlock() + + pods := make([]types.UniquePodName, 0, len(dsw.podErrors)) + for podName := range dsw.podErrors { + pods = append(pods, podName) + } + return pods +} diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index f5aa5712fb4..88a26e5b2cc 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -27,7 +27,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -270,6 +270,13 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { volumeToMount.PodName, volumeToMount.VolumeName) dswp.deleteProcessedPod(volumeToMount.PodName) } + + podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors() + for _, podName := range podsWithError { + if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists { + dswp.desiredStateOfWorld.PopPodErrors(podName) + } + } } // processPodVolumes processes the volumes in the given pod and adds them to the @@ -300,6 +307,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( podVolume.Name, format.Pod(pod), err) + dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) allVolumesAdded = false continue } @@ -314,6 +322,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( volumeSpec.Name(), uniquePodName, err) + dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) allVolumesAdded = false } @@ -335,6 +344,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( // New pod has been synced. Re-mount all volumes that need it // (e.g. DownwardAPI) dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName) + // Remove any stored errors for the pod, everything went well in this processPodVolumes + dswp.desiredStateOfWorld.PopPodErrors(uniquePodName) } } diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 2464bd0e473..7d3eb8b0b78 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -17,12 +17,14 @@ limitations under the License. package volumemanager import ( + "errors" "fmt" "sort" "strconv" + "strings" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -363,7 +365,6 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes)) if err != nil { - // Timeout expired unmountedVolumes := vm.getUnmountedVolumes(uniquePodName, expectedVolumes) // Also get unattached volumes for error message @@ -375,9 +376,10 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { } return fmt.Errorf( - "timeout expired waiting for volumes to attach or mount for pod %q/%q. list of unmounted volumes=%v. list of unattached volumes=%v", + "failed to attach or mount for pod %q/%q: %s. List of unmounted volumes=%v, list of unattached volumes=%v.", pod.Namespace, pod.Name, + err, unmountedVolumes, unattachedVolumes) } @@ -402,6 +404,9 @@ func (vm *volumeManager) getUnattachedVolumes(expectedVolumes []string) []string // volumes are mounted. func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc { return func() (done bool, err error) { + if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 { + return true, errors.New(strings.Join(errs, "; ")) + } return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil } } diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 23ea0b5bfa0..cce11810c84 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -23,9 +23,10 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" @@ -124,9 +125,16 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { // delayed claim binding go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name) - err = manager.WaitForAttachAndMount(pod) + err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { + err = manager.WaitForAttachAndMount(pod) + if err != nil { + // Few "PVC not bound" errors are expected + return false, nil + } + return true, nil + }) if err != nil { - t.Errorf("Expected success: %v", err) + t.Errorf("Expected a volume to be mounted, got: %s", err) } }