mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Add events to dswp
This commit is contained in:
parent
0fbfa755d3
commit
17be780651
@ -24,8 +24,9 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
@ -107,6 +108,18 @@ type DesiredStateOfWorld interface {
|
||||
// If a pod with the same name does not exist under the specified
|
||||
// volume, false is returned.
|
||||
VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool
|
||||
|
||||
// AddErrorToPod adds the given error to the given pod in the cache.
|
||||
// It will be returned by subsequent GetPodErrors().
|
||||
// Each error string is stored only once.
|
||||
AddErrorToPod(podName types.UniquePodName, err string)
|
||||
|
||||
// PopPodErrors returns accumulated errors on a given pod and clears
|
||||
// them.
|
||||
PopPodErrors(podName types.UniquePodName) []string
|
||||
|
||||
// GetPodsWithErrors returns names of pods that have stored errors.
|
||||
GetPodsWithErrors() []types.UniquePodName
|
||||
}
|
||||
|
||||
// VolumeToMount represents a volume that is attached to this node and needs to
|
||||
@ -120,6 +133,7 @@ func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStat
|
||||
return &desiredStateOfWorld{
|
||||
volumesToMount: make(map[v1.UniqueVolumeName]volumeToMount),
|
||||
volumePluginMgr: volumePluginMgr,
|
||||
podErrors: make(map[types.UniquePodName]sets.String),
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,6 +146,8 @@ type desiredStateOfWorld struct {
|
||||
// volumePluginMgr is the volume plugin manager used to create volume
|
||||
// plugin objects.
|
||||
volumePluginMgr *volume.VolumePluginMgr
|
||||
// podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod.
|
||||
podErrors map[types.UniquePodName]sets.String
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
@ -293,6 +309,8 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume(
|
||||
dsw.Lock()
|
||||
defer dsw.Unlock()
|
||||
|
||||
delete(dsw.podErrors, podName)
|
||||
|
||||
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
|
||||
if !volumeExists {
|
||||
return
|
||||
@ -412,3 +430,36 @@ func (dsw *desiredStateOfWorld) isDeviceMountableVolume(volumeSpec *volume.Spec)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (dsw *desiredStateOfWorld) AddErrorToPod(podName types.UniquePodName, err string) {
|
||||
dsw.Lock()
|
||||
defer dsw.Unlock()
|
||||
|
||||
if errs, found := dsw.podErrors[podName]; found {
|
||||
errs.Insert(err)
|
||||
return
|
||||
}
|
||||
dsw.podErrors[podName] = sets.NewString(err)
|
||||
}
|
||||
|
||||
func (dsw *desiredStateOfWorld) PopPodErrors(podName types.UniquePodName) []string {
|
||||
dsw.Lock()
|
||||
defer dsw.Unlock()
|
||||
|
||||
if errs, found := dsw.podErrors[podName]; found {
|
||||
delete(dsw.podErrors, podName)
|
||||
return errs.List()
|
||||
}
|
||||
return []string{}
|
||||
}
|
||||
|
||||
func (dsw *desiredStateOfWorld) GetPodsWithErrors() []types.UniquePodName {
|
||||
dsw.RLock()
|
||||
defer dsw.RUnlock()
|
||||
|
||||
pods := make([]types.UniquePodName, 0, len(dsw.podErrors))
|
||||
for podName := range dsw.podErrors {
|
||||
pods = append(pods, podName)
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -270,6 +270,13 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
||||
volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
dswp.deleteProcessedPod(volumeToMount.PodName)
|
||||
}
|
||||
|
||||
podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors()
|
||||
for _, podName := range podsWithError {
|
||||
if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists {
|
||||
dswp.desiredStateOfWorld.PopPodErrors(podName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processPodVolumes processes the volumes in the given pod and adds them to the
|
||||
@ -300,6 +307,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
|
||||
podVolume.Name,
|
||||
format.Pod(pod),
|
||||
err)
|
||||
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
|
||||
allVolumesAdded = false
|
||||
continue
|
||||
}
|
||||
@ -314,6 +322,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
|
||||
volumeSpec.Name(),
|
||||
uniquePodName,
|
||||
err)
|
||||
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
|
||||
allVolumesAdded = false
|
||||
}
|
||||
|
||||
@ -335,6 +344,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
|
||||
// New pod has been synced. Re-mount all volumes that need it
|
||||
// (e.g. DownwardAPI)
|
||||
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
|
||||
// Remove any stored errors for the pod, everything went well in this processPodVolumes
|
||||
dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,12 +17,14 @@ limitations under the License.
|
||||
package volumemanager
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -363,7 +365,6 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
|
||||
vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
|
||||
|
||||
if err != nil {
|
||||
// Timeout expired
|
||||
unmountedVolumes :=
|
||||
vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
|
||||
// Also get unattached volumes for error message
|
||||
@ -375,9 +376,10 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"timeout expired waiting for volumes to attach or mount for pod %q/%q. list of unmounted volumes=%v. list of unattached volumes=%v",
|
||||
"failed to attach or mount for pod %q/%q: %s. List of unmounted volumes=%v, list of unattached volumes=%v.",
|
||||
pod.Namespace,
|
||||
pod.Name,
|
||||
err,
|
||||
unmountedVolumes,
|
||||
unattachedVolumes)
|
||||
}
|
||||
@ -402,6 +404,9 @@ func (vm *volumeManager) getUnattachedVolumes(expectedVolumes []string) []string
|
||||
// volumes are mounted.
|
||||
func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
|
||||
return func() (done bool, err error) {
|
||||
if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
|
||||
return true, errors.New(strings.Join(errs, "; "))
|
||||
}
|
||||
return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
|
||||
}
|
||||
}
|
||||
|
@ -23,9 +23,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
@ -124,9 +125,16 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
||||
// delayed claim binding
|
||||
go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)
|
||||
|
||||
err = manager.WaitForAttachAndMount(pod)
|
||||
err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
|
||||
err = manager.WaitForAttachAndMount(pod)
|
||||
if err != nil {
|
||||
// Few "PVC not bound" errors are expected
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Expected success: %v", err)
|
||||
t.Errorf("Expected a volume to be mounted, got: %s", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user