mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #42033 from NickrenREN/dswp-findAndAddActivePods
Automatic merge from submit-queue (batch tested with PRs 41849, 42033) fix TODO: find and add active pods for dswp loops through the list of active pods and ensures that each one exists in the desired state of the world cache **Release note**: ```release-note NONE ```
This commit is contained in:
commit
e91bd12b99
@ -177,6 +177,7 @@ pkg/cloudprovider/providers/cloudstack
|
|||||||
pkg/controller/volume/attachdetach/cache
|
pkg/controller/volume/attachdetach/cache
|
||||||
pkg/controller/volume/attachdetach/populator
|
pkg/controller/volume/attachdetach/populator
|
||||||
pkg/controller/volume/attachdetach/reconciler
|
pkg/controller/volume/attachdetach/reconciler
|
||||||
|
pkg/controller/volume/attachdetach/util
|
||||||
pkg/conversion
|
pkg/conversion
|
||||||
pkg/conversion/queryparams
|
pkg/conversion/queryparams
|
||||||
pkg/credentialprovider/aws
|
pkg/credentialprovider/aws
|
||||||
|
@ -24,6 +24,7 @@ go_library(
|
|||||||
"//pkg/controller/volume/attachdetach/populator:go_default_library",
|
"//pkg/controller/volume/attachdetach/populator:go_default_library",
|
||||||
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
|
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
|
||||||
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
|
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
|
||||||
|
"//pkg/controller/volume/attachdetach/util:go_default_library",
|
||||||
"//pkg/util/io:go_default_library",
|
"//pkg/util/io:go_default_library",
|
||||||
"//pkg/util/mount:go_default_library",
|
"//pkg/util/mount:go_default_library",
|
||||||
"//pkg/volume:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
@ -67,6 +68,7 @@ filegroup(
|
|||||||
"//pkg/controller/volume/attachdetach/reconciler:all-srcs",
|
"//pkg/controller/volume/attachdetach/reconciler:all-srcs",
|
||||||
"//pkg/controller/volume/attachdetach/statusupdater:all-srcs",
|
"//pkg/controller/volume/attachdetach/statusupdater:all-srcs",
|
||||||
"//pkg/controller/volume/attachdetach/testing:all-srcs",
|
"//pkg/controller/volume/attachdetach/testing:all-srcs",
|
||||||
|
"//pkg/controller/volume/attachdetach/util:all-srcs",
|
||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
)
|
)
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
|
||||||
"k8s.io/kubernetes/pkg/util/io"
|
"k8s.io/kubernetes/pkg/util/io"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
@ -62,6 +63,11 @@ const (
|
|||||||
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
|
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
|
||||||
// DesiredStateOfWorldPopulator loop waits between successive executions
|
// DesiredStateOfWorldPopulator loop waits between successive executions
|
||||||
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
|
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
|
||||||
|
|
||||||
|
// desiredStateOfWorldPopulatorListPodsRetryDuration is the amount of
|
||||||
|
// time the DesiredStateOfWorldPopulator loop waits between list pods
|
||||||
|
// calls.
|
||||||
|
desiredStateOfWorldPopulatorListPodsRetryDuration time.Duration = 3 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// AttachDetachController defines the operations supported by this controller.
|
// AttachDetachController defines the operations supported by this controller.
|
||||||
@ -137,8 +143,12 @@ func NewAttachDetachController(
|
|||||||
|
|
||||||
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
|
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
|
||||||
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
||||||
|
desiredStateOfWorldPopulatorListPodsRetryDuration,
|
||||||
podInformer.Lister(),
|
podInformer.Lister(),
|
||||||
adc.desiredStateOfWorld)
|
adc.desiredStateOfWorld,
|
||||||
|
&adc.volumePluginMgr,
|
||||||
|
pvcInformer.Lister(),
|
||||||
|
pvInformer.Lister())
|
||||||
|
|
||||||
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: adc.podAdd,
|
AddFunc: adc.podAdd,
|
||||||
@ -245,7 +255,8 @@ func (adc *attachDetachController) podAdd(obj interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
adc.processPodVolumes(pod, true /* addVolumes */)
|
util.ProcessPodVolumes(pod, true, /* addVolumes */
|
||||||
|
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDesiredStateOfWorld returns desired state of world associated with controller
|
// GetDesiredStateOfWorld returns desired state of world associated with controller
|
||||||
@ -264,7 +275,8 @@ func (adc *attachDetachController) podDelete(obj interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
adc.processPodVolumes(pod, false /* addVolumes */)
|
util.ProcessPodVolumes(pod, false, /* addVolumes */
|
||||||
|
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (adc *attachDetachController) nodeAdd(obj interface{}) {
|
func (adc *attachDetachController) nodeAdd(obj interface{}) {
|
||||||
@ -313,225 +325,6 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
|
|||||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processPodVolumes processes the volumes in the given pod and adds them to the
|
|
||||||
// desired state of the world if addVolumes is true, otherwise it removes them.
|
|
||||||
func (adc *attachDetachController) processPodVolumes(
|
|
||||||
pod *v1.Pod, addVolumes bool) {
|
|
||||||
if pod == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(pod.Spec.Volumes) <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeName := types.NodeName(pod.Spec.NodeName)
|
|
||||||
|
|
||||||
if !adc.desiredStateOfWorld.NodeExists(nodeName) {
|
|
||||||
// If the node the pod is scheduled to does not exist in the desired
|
|
||||||
// state of the world data structure, that indicates the node is not
|
|
||||||
// yet managed by the controller. Therefore, ignore the pod.
|
|
||||||
// If the node is added to the list of managed nodes in the future,
|
|
||||||
// future adds and updates to the pod will be processed.
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
|
|
||||||
pod.Namespace,
|
|
||||||
pod.Name,
|
|
||||||
nodeName)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process volume spec for each volume defined in pod
|
|
||||||
for _, podVolume := range pod.Spec.Volumes {
|
|
||||||
volumeSpec, err := adc.createVolumeSpec(podVolume, pod.Namespace)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Error processing volume %q for pod %q/%q: %v",
|
|
||||||
podVolume.Name,
|
|
||||||
pod.Namespace,
|
|
||||||
pod.Name,
|
|
||||||
err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
attachableVolumePlugin, err :=
|
|
||||||
adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
|
||||||
if err != nil || attachableVolumePlugin == nil {
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
|
|
||||||
podVolume.Name,
|
|
||||||
pod.Namespace,
|
|
||||||
pod.Name,
|
|
||||||
err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
uniquePodName := volumehelper.GetUniquePodName(pod)
|
|
||||||
if addVolumes {
|
|
||||||
// Add volume to desired state of world
|
|
||||||
_, err := adc.desiredStateOfWorld.AddPod(
|
|
||||||
uniquePodName, pod, volumeSpec, nodeName)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
|
|
||||||
podVolume.Name,
|
|
||||||
pod.Namespace,
|
|
||||||
pod.Name,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// Remove volume from desired state of world
|
|
||||||
uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
|
|
||||||
attachableVolumePlugin, volumeSpec)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
|
|
||||||
podVolume.Name,
|
|
||||||
pod.Namespace,
|
|
||||||
pod.Name,
|
|
||||||
err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
adc.desiredStateOfWorld.DeletePod(
|
|
||||||
uniquePodName, uniqueVolumeName, nodeName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// createVolumeSpec creates and returns a mutatable volume.Spec object for the
|
|
||||||
// specified volume. It dereference any PVC to get PV objects, if needed.
|
|
||||||
func (adc *attachDetachController) createVolumeSpec(
|
|
||||||
podVolume v1.Volume, podNamespace string) (*volume.Spec, error) {
|
|
||||||
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Found PVC, ClaimName: %q/%q",
|
|
||||||
podNamespace,
|
|
||||||
pvcSource.ClaimName)
|
|
||||||
|
|
||||||
// If podVolume is a PVC, fetch the real PV behind the claim
|
|
||||||
pvName, pvcUID, err := adc.getPVCFromCacheExtractPV(
|
|
||||||
podNamespace, pvcSource.ClaimName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"error processing PVC %q/%q: %v",
|
|
||||||
podNamespace,
|
|
||||||
pvcSource.ClaimName,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
|
|
||||||
podNamespace,
|
|
||||||
pvcSource.ClaimName,
|
|
||||||
pvcUID,
|
|
||||||
pvName)
|
|
||||||
|
|
||||||
// Fetch actual PV object
|
|
||||||
volumeSpec, err := adc.getPVSpecFromCache(
|
|
||||||
pvName, pvcSource.ReadOnly, pvcUID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"error processing PVC %q/%q: %v",
|
|
||||||
podNamespace,
|
|
||||||
pvcSource.ClaimName,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(10).Infof(
|
|
||||||
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
|
|
||||||
volumeSpec.Name,
|
|
||||||
pvName,
|
|
||||||
podNamespace,
|
|
||||||
pvcSource.ClaimName,
|
|
||||||
pvcUID)
|
|
||||||
|
|
||||||
return volumeSpec, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do not return the original volume object, since it's from the shared
|
|
||||||
// informer it may be mutated by another consumer.
|
|
||||||
clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume)
|
|
||||||
if err != nil || clonedPodVolumeObj == nil {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"failed to deep copy %q volume object. err=%v", podVolume.Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
return volume.NewSpecFromVolume(clonedPodVolume), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
|
|
||||||
// name from the shared internal PVC store extracts the name of the PV it is
|
|
||||||
// pointing to and returns it.
|
|
||||||
// This method returns an error if a PVC object does not exist in the cache
|
|
||||||
// with the given namespace/name.
|
|
||||||
// This method returns an error if the PVC object's phase is not "Bound".
|
|
||||||
func (adc *attachDetachController) getPVCFromCacheExtractPV(namespace string, name string) (string, types.UID, error) {
|
|
||||||
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
|
|
||||||
if err != nil {
|
|
||||||
return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
|
|
||||||
return "", "", fmt.Errorf(
|
|
||||||
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
|
|
||||||
namespace,
|
|
||||||
name,
|
|
||||||
pvc.Status.Phase,
|
|
||||||
pvc.Spec.VolumeName)
|
|
||||||
}
|
|
||||||
|
|
||||||
return pvc.Spec.VolumeName, pvc.UID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getPVSpecFromCache fetches the PV object with the given name from the shared
|
|
||||||
// internal PV store and returns a volume.Spec representing it.
|
|
||||||
// This method returns an error if a PV object does not exist in the cache with
|
|
||||||
// the given name.
|
|
||||||
// This method deep copies the PV object so the caller may use the returned
|
|
||||||
// volume.Spec object without worrying about it mutating unexpectedly.
|
|
||||||
func (adc *attachDetachController) getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID) (*volume.Spec, error) {
|
|
||||||
pv, err := adc.pvLister.Get(name)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if pv.Spec.ClaimRef == nil {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
|
|
||||||
name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if pv.Spec.ClaimRef.UID != expectedClaimUID {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
|
|
||||||
name,
|
|
||||||
pv.Spec.ClaimRef.UID,
|
|
||||||
expectedClaimUID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do not return the object from the informer, since the store is shared it
|
|
||||||
// may be mutated by another consumer.
|
|
||||||
clonedPVObj, err := api.Scheme.DeepCopy(pv)
|
|
||||||
if err != nil || clonedPVObj == nil {
|
|
||||||
return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
clonedPV, ok := clonedPVObj.(*v1.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv)
|
|
||||||
}
|
|
||||||
|
|
||||||
return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// processVolumesInUse processes the list of volumes marked as "in-use"
|
// processVolumesInUse processes the list of volumes marked as "in-use"
|
||||||
// according to the specified Node's Status.VolumesInUse and updates the
|
// according to the specified Node's Status.VolumesInUse and updates the
|
||||||
// corresponding volume in the actual state of the world to indicate that it is
|
// corresponding volume in the actual state of the world to indicate that it is
|
||||||
|
@ -5,6 +5,7 @@ licenses(["notice"])
|
|||||||
load(
|
load(
|
||||||
"@io_bazel_rules_go//go:def.bzl",
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
"go_library",
|
"go_library",
|
||||||
|
"go_test",
|
||||||
)
|
)
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
@ -14,9 +15,12 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/client/listers/core/v1:go_default_library",
|
"//pkg/client/listers/core/v1:go_default_library",
|
||||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||||
|
"//pkg/controller/volume/attachdetach/util:go_default_library",
|
||||||
|
"//pkg/volume:go_default_library",
|
||||||
"//pkg/volume/util/volumehelper:go_default_library",
|
"//pkg/volume/util/volumehelper:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
@ -35,3 +39,21 @@ filegroup(
|
|||||||
srcs = [":package-srcs"],
|
srcs = [":package-srcs"],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["desired_state_of_world_populator_test.go"],
|
||||||
|
library = ":go_default_library",
|
||||||
|
tags = ["automanaged"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/api/v1:go_default_library",
|
||||||
|
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
|
"//pkg/controller:go_default_library",
|
||||||
|
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||||
|
"//pkg/volume/testing:go_default_library",
|
||||||
|
"//pkg/volume/util/volumehelper:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@ -25,17 +25,20 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
kcache "k8s.io/client-go/tools/cache"
|
kcache "k8s.io/client-go/tools/cache"
|
||||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
|
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
|
||||||
// desired state of th world still exist, if not, it removes them.
|
// desired state of the world still exist, if not, it removes them.
|
||||||
// TODO: it also loops through the list of active pods and ensures that
|
// It also loops through the list of active pods and ensures that
|
||||||
// each one exists in the desired state of the world cache
|
// each one exists in the desired state of the world cache
|
||||||
// if it has volumes.
|
// if it has volumes.
|
||||||
type DesiredStateOfWorldPopulator interface {
|
type DesiredStateOfWorldPopulator interface {
|
||||||
@ -50,19 +53,32 @@ type DesiredStateOfWorldPopulator interface {
|
|||||||
// desiredStateOfWorld - the cache to populate
|
// desiredStateOfWorld - the cache to populate
|
||||||
func NewDesiredStateOfWorldPopulator(
|
func NewDesiredStateOfWorldPopulator(
|
||||||
loopSleepDuration time.Duration,
|
loopSleepDuration time.Duration,
|
||||||
|
listPodsRetryDuration time.Duration,
|
||||||
podLister corelisters.PodLister,
|
podLister corelisters.PodLister,
|
||||||
desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator {
|
desiredStateOfWorld cache.DesiredStateOfWorld,
|
||||||
|
volumePluginMgr *volume.VolumePluginMgr,
|
||||||
|
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||||
|
pvLister corelisters.PersistentVolumeLister) DesiredStateOfWorldPopulator {
|
||||||
return &desiredStateOfWorldPopulator{
|
return &desiredStateOfWorldPopulator{
|
||||||
loopSleepDuration: loopSleepDuration,
|
loopSleepDuration: loopSleepDuration,
|
||||||
podLister: podLister,
|
listPodsRetryDuration: listPodsRetryDuration,
|
||||||
desiredStateOfWorld: desiredStateOfWorld,
|
podLister: podLister,
|
||||||
|
desiredStateOfWorld: desiredStateOfWorld,
|
||||||
|
volumePluginMgr: volumePluginMgr,
|
||||||
|
pvcLister: pvcLister,
|
||||||
|
pvLister: pvLister,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type desiredStateOfWorldPopulator struct {
|
type desiredStateOfWorldPopulator struct {
|
||||||
loopSleepDuration time.Duration
|
loopSleepDuration time.Duration
|
||||||
podLister corelisters.PodLister
|
podLister corelisters.PodLister
|
||||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||||
|
volumePluginMgr *volume.VolumePluginMgr
|
||||||
|
pvcLister corelisters.PersistentVolumeClaimLister
|
||||||
|
pvLister corelisters.PersistentVolumeLister
|
||||||
|
listPodsRetryDuration time.Duration
|
||||||
|
timeOfLastListPods time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
|
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
|
||||||
@ -72,6 +88,18 @@ func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
|
|||||||
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
|
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
|
||||||
return func() {
|
return func() {
|
||||||
dswp.findAndRemoveDeletedPods()
|
dswp.findAndRemoveDeletedPods()
|
||||||
|
|
||||||
|
// findAndAddActivePods is called periodically, independently of the main
|
||||||
|
// populator loop.
|
||||||
|
if time.Since(dswp.timeOfLastListPods) < dswp.listPodsRetryDuration {
|
||||||
|
glog.V(5).Infof(
|
||||||
|
"Skipping findAndAddActivePods(). Not permitted until %v (listPodsRetryDuration %v).",
|
||||||
|
dswp.timeOfLastListPods.Add(dswp.listPodsRetryDuration),
|
||||||
|
dswp.listPodsRetryDuration)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dswp.findAndAddActivePods()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,3 +141,23 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
|||||||
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
|
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
|
||||||
|
pods, err := dswp.podLister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("podLister List failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dswp.timeOfLastListPods = time.Now()
|
||||||
|
|
||||||
|
for _, pod := range pods {
|
||||||
|
if volumehelper.IsPodTerminated(pod, pod.Status) {
|
||||||
|
// Do not add volumes for terminated pods
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
util.ProcessPodVolumes(pod, true,
|
||||||
|
dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -0,0 +1,137 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package populator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||||
|
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
|
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
|
||||||
|
fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
|
||||||
|
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
|
||||||
|
fakePodInformer := fakeInformerFactory.Core().V1().Pods()
|
||||||
|
|
||||||
|
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
|
||||||
|
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "dswp-test-pod",
|
||||||
|
UID: "dswp-test-pod-uid",
|
||||||
|
Namespace: "dswp-test",
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
NodeName: "dswp-test-host",
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
Name: "dswp-test-volume-name",
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
||||||
|
PDName: "dswp-test-fake-device",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
Phase: v1.PodPhase("Running"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fakePodInformer.Informer().GetStore().Add(pod)
|
||||||
|
|
||||||
|
podName := volumehelper.GetUniquePodName(pod)
|
||||||
|
|
||||||
|
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
|
||||||
|
|
||||||
|
pvcLister := fakeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
|
pvLister := fakeInformerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
|
|
||||||
|
dswp := &desiredStateOfWorldPopulator{
|
||||||
|
loopSleepDuration: 100 * time.Millisecond,
|
||||||
|
listPodsRetryDuration: 3 * time.Second,
|
||||||
|
desiredStateOfWorld: fakesDSW,
|
||||||
|
volumePluginMgr: fakeVolumePluginMgr,
|
||||||
|
podLister: fakePodInformer.Lister(),
|
||||||
|
pvcLister: pvcLister,
|
||||||
|
pvLister: pvLister,
|
||||||
|
}
|
||||||
|
|
||||||
|
//add the given node to the list of nodes managed by dsw
|
||||||
|
dswp.desiredStateOfWorld.AddNode(k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
|
||||||
|
dswp.findAndAddActivePods()
|
||||||
|
|
||||||
|
expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName)
|
||||||
|
|
||||||
|
//check if the given volume referenced by the pod is added to dsw
|
||||||
|
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
if !volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
//delete the pod and volume manually
|
||||||
|
dswp.desiredStateOfWorld.DeletePod(podName, expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
|
||||||
|
//check if the given volume referenced by the pod still exists in dsw
|
||||||
|
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
if volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
//add pod and volume again
|
||||||
|
dswp.findAndAddActivePods()
|
||||||
|
|
||||||
|
//check if the given volume referenced by the pod is added to dsw for the second time
|
||||||
|
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
if !volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
fakePodInformer.Informer().GetStore().Delete(pod)
|
||||||
|
dswp.findAndRemoveDeletedPods()
|
||||||
|
//check if the given volume referenced by the pod still exists in dsw
|
||||||
|
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
if volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
37
pkg/controller/volume/attachdetach/util/BUILD
Normal file
37
pkg/controller/volume/attachdetach/util/BUILD
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package(default_visibility = ["//visibility:public"])
|
||||||
|
|
||||||
|
licenses(["notice"])
|
||||||
|
|
||||||
|
load(
|
||||||
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
|
"go_library",
|
||||||
|
)
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["util.go"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/api:go_default_library",
|
||||||
|
"//pkg/api/v1:go_default_library",
|
||||||
|
"//pkg/client/listers/core/v1:go_default_library",
|
||||||
|
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||||
|
"//pkg/volume:go_default_library",
|
||||||
|
"//pkg/volume/util/volumehelper:go_default_library",
|
||||||
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
)
|
252
pkg/controller/volume/attachdetach/util/util.go
Normal file
252
pkg/controller/volume/attachdetach/util/util.go
Normal file
@ -0,0 +1,252 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CreateVolumeSpec creates and returns a mutatable volume.Spec object for the
|
||||||
|
// specified volume. It dereference any PVC to get PV objects, if needed.
|
||||||
|
func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
|
||||||
|
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Found PVC, ClaimName: %q/%q",
|
||||||
|
podNamespace,
|
||||||
|
pvcSource.ClaimName)
|
||||||
|
|
||||||
|
// If podVolume is a PVC, fetch the real PV behind the claim
|
||||||
|
pvName, pvcUID, err := getPVCFromCacheExtractPV(
|
||||||
|
podNamespace, pvcSource.ClaimName, pvcLister)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"error processing PVC %q/%q: %v",
|
||||||
|
podNamespace,
|
||||||
|
pvcSource.ClaimName,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
|
||||||
|
podNamespace,
|
||||||
|
pvcSource.ClaimName,
|
||||||
|
pvcUID,
|
||||||
|
pvName)
|
||||||
|
|
||||||
|
// Fetch actual PV object
|
||||||
|
volumeSpec, err := getPVSpecFromCache(
|
||||||
|
pvName, pvcSource.ReadOnly, pvcUID, pvLister)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"error processing PVC %q/%q: %v",
|
||||||
|
podNamespace,
|
||||||
|
pvcSource.ClaimName,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
|
||||||
|
volumeSpec.Name,
|
||||||
|
pvName,
|
||||||
|
podNamespace,
|
||||||
|
pvcSource.ClaimName,
|
||||||
|
pvcUID)
|
||||||
|
|
||||||
|
return volumeSpec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not return the original volume object, since it's from the shared
|
||||||
|
// informer it may be mutated by another consumer.
|
||||||
|
clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume)
|
||||||
|
if err != nil || clonedPodVolumeObj == nil {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"failed to deep copy %q volume object. err=%v", podVolume.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj)
|
||||||
|
}
|
||||||
|
|
||||||
|
return volume.NewSpecFromVolume(clonedPodVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
|
||||||
|
// name from the shared internal PVC store extracts the name of the PV it is
|
||||||
|
// pointing to and returns it.
|
||||||
|
// This method returns an error if a PVC object does not exist in the cache
|
||||||
|
// with the given namespace/name.
|
||||||
|
// This method returns an error if the PVC object's phase is not "Bound".
|
||||||
|
func getPVCFromCacheExtractPV(namespace string, name string, pvcLister corelisters.PersistentVolumeClaimLister) (string, types.UID, error) {
|
||||||
|
pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(name)
|
||||||
|
if err != nil {
|
||||||
|
return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
|
||||||
|
return "", "", fmt.Errorf(
|
||||||
|
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
|
||||||
|
namespace,
|
||||||
|
name,
|
||||||
|
pvc.Status.Phase,
|
||||||
|
pvc.Spec.VolumeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pvc.Spec.VolumeName, pvc.UID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPVSpecFromCache fetches the PV object with the given name from the shared
|
||||||
|
// internal PV store and returns a volume.Spec representing it.
|
||||||
|
// This method returns an error if a PV object does not exist in the cache with
|
||||||
|
// the given name.
|
||||||
|
// This method deep copies the PV object so the caller may use the returned
|
||||||
|
// volume.Spec object without worrying about it mutating unexpectedly.
|
||||||
|
func getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
|
||||||
|
pv, err := pvLister.Get(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pv.Spec.ClaimRef == nil {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
|
||||||
|
name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pv.Spec.ClaimRef.UID != expectedClaimUID {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
|
||||||
|
name,
|
||||||
|
pv.Spec.ClaimRef.UID,
|
||||||
|
expectedClaimUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not return the object from the informer, since the store is shared it
|
||||||
|
// may be mutated by another consumer.
|
||||||
|
clonedPVObj, err := api.Scheme.DeepCopy(pv)
|
||||||
|
if err != nil || clonedPVObj == nil {
|
||||||
|
return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clonedPV, ok := clonedPVObj.(*v1.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv)
|
||||||
|
}
|
||||||
|
|
||||||
|
return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessPodVolumes processes the volumes in the given pod and adds them to the
|
||||||
|
// desired state of the world if addVolumes is true, otherwise it removes them.
|
||||||
|
func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) {
|
||||||
|
if pod == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pod.Spec.Volumes) <= 0 {
|
||||||
|
glog.V(10).Infof("Skipping processing of pod %q/%q: it has no volumes.",
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeName := types.NodeName(pod.Spec.NodeName)
|
||||||
|
if nodeName == "" {
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Skipping processing of pod %q/%q: it is not scheduled to a node.",
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name)
|
||||||
|
return
|
||||||
|
} else if !desiredStateOfWorld.NodeExists(nodeName) {
|
||||||
|
// If the node the pod is scheduled to does not exist in the desired
|
||||||
|
// state of the world data structure, that indicates the node is not
|
||||||
|
// yet managed by the controller. Therefore, ignore the pod.
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name,
|
||||||
|
nodeName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process volume spec for each volume defined in pod
|
||||||
|
for _, podVolume := range pod.Spec.Volumes {
|
||||||
|
volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, pvcLister, pvLister)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Error processing volume %q for pod %q/%q: %v",
|
||||||
|
podVolume.Name,
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name,
|
||||||
|
err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
attachableVolumePlugin, err :=
|
||||||
|
volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||||
|
if err != nil || attachableVolumePlugin == nil {
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
|
||||||
|
podVolume.Name,
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name,
|
||||||
|
err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uniquePodName := volumehelper.GetUniquePodName(pod)
|
||||||
|
if addVolumes {
|
||||||
|
// Add volume to desired state of world
|
||||||
|
_, err := desiredStateOfWorld.AddPod(
|
||||||
|
uniquePodName, pod, volumeSpec, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
|
||||||
|
podVolume.Name,
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// Remove volume from desired state of world
|
||||||
|
uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
|
||||||
|
attachableVolumePlugin, volumeSpec)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
|
||||||
|
podVolume.Name,
|
||||||
|
pod.Namespace,
|
||||||
|
pod.Name,
|
||||||
|
err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
desiredStateOfWorld.DeletePod(
|
||||||
|
uniquePodName, uniqueVolumeName, nodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
@ -143,18 +143,7 @@ func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool {
|
|||||||
if !found {
|
if !found {
|
||||||
podStatus = pod.Status
|
podStatus = pod.Status
|
||||||
}
|
}
|
||||||
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
|
return volumehelper.IsPodTerminated(pod, podStatus)
|
||||||
}
|
|
||||||
|
|
||||||
// notRunning returns true if every status is terminated or waiting, or the status list
|
|
||||||
// is empty.
|
|
||||||
func notRunning(statuses []v1.ContainerStatus) bool {
|
|
||||||
for _, status := range statuses {
|
|
||||||
if status.State.Terminated == nil && status.State.Waiting == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate through all pods and add to desired state of world if they don't
|
// Iterate through all pods and add to desired state of world if they don't
|
||||||
|
@ -87,3 +87,19 @@ func GetUniqueVolumeNameFromSpec(
|
|||||||
volumeName),
|
volumeName),
|
||||||
nil
|
nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsPodTerminated checks if pod is terminated
|
||||||
|
func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {
|
||||||
|
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
|
||||||
|
}
|
||||||
|
|
||||||
|
// notRunning returns true if every status is terminated or waiting, or the status list
|
||||||
|
// is empty.
|
||||||
|
func notRunning(statuses []v1.ContainerStatus) bool {
|
||||||
|
for _, status := range statuses {
|
||||||
|
if status.State.Terminated == nil && status.State.Waiting == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
@ -222,3 +222,89 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
|||||||
}
|
}
|
||||||
return testClient, ctrl, informers
|
return testClient, ctrl, informers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Via integration test we can verify that if pod add
|
||||||
|
// event is somehow missed by AttachDetach controller - it still
|
||||||
|
// gets added by Desired State of World populator.
|
||||||
|
func TestPodAddedByDswp(t *testing.T) {
|
||||||
|
_, server := framework.RunAMaster(nil)
|
||||||
|
defer server.Close()
|
||||||
|
namespaceName := "test-pod-deletion"
|
||||||
|
|
||||||
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "node-sandbox",
|
||||||
|
Annotations: map[string]string{
|
||||||
|
volumehelper.ControllerManagedAttachAnnotation: "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||||
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
|
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
|
||||||
|
|
||||||
|
pod := fakePodWithVol(namespaceName)
|
||||||
|
podStopCh := make(chan struct{})
|
||||||
|
|
||||||
|
if _, err := testClient.Core().Nodes().Create(node); err != nil {
|
||||||
|
t.Fatalf("Failed to created node : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go informers.Core().V1().Nodes().Informer().Run(podStopCh)
|
||||||
|
|
||||||
|
if _, err := testClient.Core().Pods(ns.Name).Create(pod); err != nil {
|
||||||
|
t.Errorf("Failed to create pod : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
podInformer := informers.Core().V1().Pods().Informer()
|
||||||
|
go podInformer.Run(podStopCh)
|
||||||
|
|
||||||
|
// start controller loop
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||||
|
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||||
|
go ctrl.Run(stopCh)
|
||||||
|
|
||||||
|
waitToObservePods(t, podInformer, 1)
|
||||||
|
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err = podInformer.GetStore().GetByKey(podKey)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Pod not found in Pod Informer cache : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
|
||||||
|
|
||||||
|
// let's stop pod events from getting triggered
|
||||||
|
close(podStopCh)
|
||||||
|
podObj, err := api.Scheme.DeepCopy(pod)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error copying pod : %v", err)
|
||||||
|
}
|
||||||
|
podNew, ok := podObj.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Error converting pod : %v", err)
|
||||||
|
}
|
||||||
|
newPodName := "newFakepod"
|
||||||
|
podNew.SetName(newPodName)
|
||||||
|
err = podInformer.GetStore().Add(podNew)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error adding pod : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitToObservePods(t, podInformer, 2)
|
||||||
|
// the findAndAddActivePods loop turns every 3 minute
|
||||||
|
time.Sleep(200 * time.Second)
|
||||||
|
podsToAdd := ctrl.GetDesiredStateOfWorld().GetPodToAdd()
|
||||||
|
if len(podsToAdd) != 2 {
|
||||||
|
t.Fatalf("DSW should have two pods")
|
||||||
|
}
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user