diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 12c341640d9..8b4596a3563 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1675,8 +1675,8 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error { if !kl.podIsTerminated(pod) { // Wait for volumes to attach/mount if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { - kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err) - klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) + kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) + klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) return err } } diff --git a/pkg/kubelet/volumemanager/BUILD b/pkg/kubelet/volumemanager/BUILD index f967754b53f..30666c1d4ca 100644 --- a/pkg/kubelet/volumemanager/BUILD +++ b/pkg/kubelet/volumemanager/BUILD @@ -61,6 +61,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/kubelet/volumemanager/cache/BUILD b/pkg/kubelet/volumemanager/cache/BUILD index 9d6a95049d6..688209d6e13 100644 --- a/pkg/kubelet/volumemanager/cache/BUILD +++ b/pkg/kubelet/volumemanager/cache/BUILD @@ -23,6 +23,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index 28b15b5f161..3387a2a93b4 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -24,8 +24,9 @@ import ( "fmt" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -107,6 +108,18 @@ type DesiredStateOfWorld interface { // If a pod with the same name does not exist under the specified // volume, false is returned. VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool + + // AddErrorToPod adds the given error to the given pod in the cache. + // It will be returned by subsequent GetPodErrors(). + // Each error string is stored only once. + AddErrorToPod(podName types.UniquePodName, err string) + + // PopPodErrors returns accumulated errors on a given pod and clears + // them. + PopPodErrors(podName types.UniquePodName) []string + + // GetPodsWithErrors returns names of pods that have stored errors. + GetPodsWithErrors() []types.UniquePodName } // VolumeToMount represents a volume that is attached to this node and needs to @@ -120,6 +133,7 @@ func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStat return &desiredStateOfWorld{ volumesToMount: make(map[v1.UniqueVolumeName]volumeToMount), volumePluginMgr: volumePluginMgr, + podErrors: make(map[types.UniquePodName]sets.String), } } @@ -132,6 +146,8 @@ type desiredStateOfWorld struct { // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr + // podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod. + podErrors map[types.UniquePodName]sets.String sync.RWMutex } @@ -190,6 +206,12 @@ type podToMount struct { outerVolumeSpecName string } +const ( + // Maximum errors to be stored per pod in desiredStateOfWorld.podErrors to + // prevent unbound growth. + maxPodErrors = 10 +) + func (dsw *desiredStateOfWorld) AddPodToVolume( podName types.UniquePodName, pod *v1.Pod, @@ -293,6 +315,8 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume( dsw.Lock() defer dsw.Unlock() + delete(dsw.podErrors, podName) + volumeObj, volumeExists := dsw.volumesToMount[volumeName] if !volumeExists { return @@ -412,3 +436,38 @@ func (dsw *desiredStateOfWorld) isDeviceMountableVolume(volumeSpec *volume.Spec) return false } + +func (dsw *desiredStateOfWorld) AddErrorToPod(podName types.UniquePodName, err string) { + dsw.Lock() + defer dsw.Unlock() + + if errs, found := dsw.podErrors[podName]; found { + if errs.Len() <= maxPodErrors { + errs.Insert(err) + } + return + } + dsw.podErrors[podName] = sets.NewString(err) +} + +func (dsw *desiredStateOfWorld) PopPodErrors(podName types.UniquePodName) []string { + dsw.Lock() + defer dsw.Unlock() + + if errs, found := dsw.podErrors[podName]; found { + delete(dsw.podErrors, podName) + return errs.List() + } + return []string{} +} + +func (dsw *desiredStateOfWorld) GetPodsWithErrors() []types.UniquePodName { + dsw.RLock() + defer dsw.RUnlock() + + pods := make([]types.UniquePodName, 0, len(dsw.podErrors)) + for podName := range dsw.podErrors { + pods = append(pods, podName) + } + return pods +} diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index f5aa5712fb4..31c5de4e8ae 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -21,13 +21,14 @@ caches in sync with the "ground truth". package populator import ( + "errors" "fmt" "sync" "time" "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -270,6 +271,13 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { volumeToMount.PodName, volumeToMount.VolumeName) dswp.deleteProcessedPod(volumeToMount.PodName) } + + podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors() + for _, podName := range podsWithError { + if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists { + dswp.desiredStateOfWorld.PopPodErrors(podName) + } + } } // processPodVolumes processes the volumes in the given pod and adds them to the @@ -300,6 +308,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( podVolume.Name, format.Pod(pod), err) + dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) allVolumesAdded = false continue } @@ -309,11 +318,12 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue) if err != nil { klog.Errorf( - "Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v", + "Failed to add volume %s (specName: %s) for pod %q to desiredStateOfWorld: %v", podVolume.Name, volumeSpec.Name(), uniquePodName, err) + dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) allVolumesAdded = false } @@ -335,6 +345,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( // New pod has been synced. Re-mount all volumes that need it // (e.g. DownwardAPI) dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName) + // Remove any stored errors for the pod, everything went well in this processPodVolumes + dswp.desiredStateOfWorld.PopPodErrors(uniquePodName) } } @@ -486,7 +498,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( podNamespace, pvcSource.ClaimName) if err != nil { return nil, nil, "", fmt.Errorf( - "error processing PVC %q/%q: %v", + "error processing PVC %s/%s: %v", podNamespace, pvcSource.ClaimName, err) @@ -505,7 +517,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID) if err != nil { return nil, nil, "", fmt.Errorf( - "error processing PVC %q/%q: %v", + "error processing PVC %s/%s: %v", podNamespace, pvcSource.ClaimName, err) @@ -528,20 +540,16 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( // Error if a container has volumeMounts but the volumeMode of PVC isn't Filesystem if mountsMap[podVolume.Name] && volumeMode != v1.PersistentVolumeFilesystem { return nil, nil, "", fmt.Errorf( - "Volume %q has volumeMode %q, but is specified in volumeMounts for pod %q/%q", + "volume %s has volumeMode %s, but is specified in volumeMounts", podVolume.Name, - volumeMode, - podNamespace, - podName) + volumeMode) } // Error if a container has volumeDevices but the volumeMode of PVC isn't Block if devicesMap[podVolume.Name] && volumeMode != v1.PersistentVolumeBlock { return nil, nil, "", fmt.Errorf( - "Volume %q has volumeMode %q, but is specified in volumeDevices for pod %q/%q", + "volume %s has volumeMode %s, but is specified in volumeDevices", podVolume.Name, - volumeMode, - podNamespace, - podName) + volumeMode) } } return pvc, volumeSpec, volumeGidValue, nil @@ -562,11 +570,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( pvc, err := dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{}) if err != nil || pvc == nil { - return nil, fmt.Errorf( - "failed to fetch PVC %s/%s from API server. err=%v", - namespace, - claimName, - err) + return nil, fmt.Errorf("failed to fetch PVC from API server: %v", err) } if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) { @@ -579,21 +583,15 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( // It should happen only in very rare case when scheduler schedules // a pod and user deletes a PVC that's used by it at the same time. if pvc.ObjectMeta.DeletionTimestamp != nil { - return nil, fmt.Errorf( - "can't start pod because PVC %s/%s is being deleted", - namespace, - claimName) + return nil, errors.New("PVC is being deleted") } } - if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { - - return nil, fmt.Errorf( - "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", - namespace, - claimName, - pvc.Status.Phase, - pvc.Spec.VolumeName) + if pvc.Status.Phase != v1.ClaimBound { + return nil, errors.New("PVC is not bound") + } + if pvc.Spec.VolumeName == "" { + return nil, errors.New("PVC has empty pvc.Spec.VolumeName") } return pvc, nil @@ -609,18 +607,18 @@ func (dswp *desiredStateOfWorldPopulator) getPVSpec( pv, err := dswp.kubeClient.CoreV1().PersistentVolumes().Get(name, metav1.GetOptions{}) if err != nil || pv == nil { return nil, "", fmt.Errorf( - "failed to fetch PV %q from API server. err=%v", name, err) + "failed to fetch PV %s from API server: %v", name, err) } if pv.Spec.ClaimRef == nil { return nil, "", fmt.Errorf( - "found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim", + "found PV object %s but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim", name) } if pv.Spec.ClaimRef.UID != expectedClaimUID { return nil, "", fmt.Errorf( - "found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)", + "found PV object %s but its pv.Spec.ClaimRef.UID %s does not point to claim.UID %s", name, pv.Spec.ClaimRef.UID, expectedClaimUID) diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 2464bd0e473..301916a3d13 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -17,12 +17,14 @@ limitations under the License. package volumemanager import ( + "errors" "fmt" "sort" "strconv" + "strings" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -363,7 +365,6 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes)) if err != nil { - // Timeout expired unmountedVolumes := vm.getUnmountedVolumes(uniquePodName, expectedVolumes) // Also get unattached volumes for error message @@ -375,11 +376,10 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { } return fmt.Errorf( - "timeout expired waiting for volumes to attach or mount for pod %q/%q. list of unmounted volumes=%v. list of unattached volumes=%v", - pod.Namespace, - pod.Name, + "unmounted volumes=%v, unattached volumes=%v: %s", unmountedVolumes, - unattachedVolumes) + unattachedVolumes, + err) } klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod)) @@ -402,6 +402,9 @@ func (vm *volumeManager) getUnattachedVolumes(expectedVolumes []string) []string // volumes are mounted. func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc { return func() (done bool, err error) { + if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 { + return true, errors.New(strings.Join(errs, "; ")) + } return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil } } diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 23ea0b5bfa0..cce11810c84 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -23,9 +23,10 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" @@ -124,9 +125,16 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { // delayed claim binding go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name) - err = manager.WaitForAttachAndMount(pod) + err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { + err = manager.WaitForAttachAndMount(pod) + if err != nil { + // Few "PVC not bound" errors are expected + return false, nil + } + return true, nil + }) if err != nil { - t.Errorf("Expected success: %v", err) + t.Errorf("Expected a volume to be mounted, got: %s", err) } }