find and add active pods for dswp

loops through the list of active pods and ensures that each one exists in the desired state of the world cache
This commit is contained in:
NickrenREN 2017-02-23 19:34:41 +08:00 committed by NickrenREN
parent aa43023fd7
commit 5cafb9042b
11 changed files with 626 additions and 243 deletions

View File

@ -177,6 +177,7 @@ pkg/cloudprovider/providers/cloudstack
pkg/controller/volume/attachdetach/cache
pkg/controller/volume/attachdetach/populator
pkg/controller/volume/attachdetach/reconciler
pkg/controller/volume/attachdetach/util
pkg/conversion
pkg/conversion/queryparams
pkg/credentialprovider/aws

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/controller/volume/attachdetach/populator:go_default_library",
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
@ -67,6 +68,7 @@ filegroup(
"//pkg/controller/volume/attachdetach/reconciler:all-srcs",
"//pkg/controller/volume/attachdetach/statusupdater:all-srcs",
"//pkg/controller/volume/attachdetach/testing:all-srcs",
"//pkg/controller/volume/attachdetach/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
@ -62,6 +63,11 @@ const (
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
// desiredStateOfWorldPopulatorListPodsRetryDuration is the amount of
// time the DesiredStateOfWorldPopulator loop waits between list pods
// calls.
desiredStateOfWorldPopulatorListPodsRetryDuration time.Duration = 3 * time.Minute
)
// AttachDetachController defines the operations supported by this controller.
@ -137,8 +143,12 @@ func NewAttachDetachController(
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorListPodsRetryDuration,
podInformer.Lister(),
adc.desiredStateOfWorld)
adc.desiredStateOfWorld,
&adc.volumePluginMgr,
pvcInformer.Lister(),
pvInformer.Lister())
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.podAdd,
@ -245,7 +255,8 @@ func (adc *attachDetachController) podAdd(obj interface{}) {
return
}
adc.processPodVolumes(pod, true /* addVolumes */)
util.ProcessPodVolumes(pod, true, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}
// GetDesiredStateOfWorld returns desired state of world associated with controller
@ -264,7 +275,8 @@ func (adc *attachDetachController) podDelete(obj interface{}) {
return
}
adc.processPodVolumes(pod, false /* addVolumes */)
util.ProcessPodVolumes(pod, false, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}
func (adc *attachDetachController) nodeAdd(obj interface{}) {
@ -313,225 +325,6 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}
// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world if addVolumes is true, otherwise it removes them.
func (adc *attachDetachController) processPodVolumes(
pod *v1.Pod, addVolumes bool) {
if pod == nil {
return
}
if len(pod.Spec.Volumes) <= 0 {
return
}
nodeName := types.NodeName(pod.Spec.NodeName)
if !adc.desiredStateOfWorld.NodeExists(nodeName) {
// If the node the pod is scheduled to does not exist in the desired
// state of the world data structure, that indicates the node is not
// yet managed by the controller. Therefore, ignore the pod.
// If the node is added to the list of managed nodes in the future,
// future adds and updates to the pod will be processed.
glog.V(10).Infof(
"Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
pod.Namespace,
pod.Name,
nodeName)
return
}
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, err := adc.createVolumeSpec(podVolume, pod.Namespace)
if err != nil {
glog.V(10).Infof(
"Error processing volume %q for pod %q/%q: %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
attachableVolumePlugin, err :=
adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
glog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
uniquePodName := volumehelper.GetUniquePodName(pod)
if addVolumes {
// Add volume to desired state of world
_, err := adc.desiredStateOfWorld.AddPod(
uniquePodName, pod, volumeSpec, nodeName)
if err != nil {
glog.V(10).Infof(
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
}
} else {
// Remove volume from desired state of world
uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
glog.V(10).Infof(
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
adc.desiredStateOfWorld.DeletePod(
uniquePodName, uniqueVolumeName, nodeName)
}
}
return
}
// createVolumeSpec creates and returns a mutatable volume.Spec object for the
// specified volume. It dereference any PVC to get PV objects, if needed.
func (adc *attachDetachController) createVolumeSpec(
podVolume v1.Volume, podNamespace string) (*volume.Spec, error) {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
glog.V(10).Infof(
"Found PVC, ClaimName: %q/%q",
podNamespace,
pvcSource.ClaimName)
// If podVolume is a PVC, fetch the real PV behind the claim
pvName, pvcUID, err := adc.getPVCFromCacheExtractPV(
podNamespace, pvcSource.ClaimName)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}
glog.V(10).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
podNamespace,
pvcSource.ClaimName,
pvcUID,
pvName)
// Fetch actual PV object
volumeSpec, err := adc.getPVSpecFromCache(
pvName, pvcSource.ReadOnly, pvcUID)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}
glog.V(10).Infof(
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name,
pvName,
podNamespace,
pvcSource.ClaimName,
pvcUID)
return volumeSpec, nil
}
// Do not return the original volume object, since it's from the shared
// informer it may be mutated by another consumer.
clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume)
if err != nil || clonedPodVolumeObj == nil {
return nil, fmt.Errorf(
"failed to deep copy %q volume object. err=%v", podVolume.Name, err)
}
clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume)
if !ok {
return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj)
}
return volume.NewSpecFromVolume(clonedPodVolume), nil
}
// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
// name from the shared internal PVC store extracts the name of the PV it is
// pointing to and returns it.
// This method returns an error if a PVC object does not exist in the cache
// with the given namespace/name.
// This method returns an error if the PVC object's phase is not "Bound".
func (adc *attachDetachController) getPVCFromCacheExtractPV(namespace string, name string) (string, types.UID, error) {
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
}
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
return "", "", fmt.Errorf(
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
namespace,
name,
pvc.Status.Phase,
pvc.Spec.VolumeName)
}
return pvc.Spec.VolumeName, pvc.UID, nil
}
// getPVSpecFromCache fetches the PV object with the given name from the shared
// internal PV store and returns a volume.Spec representing it.
// This method returns an error if a PV object does not exist in the cache with
// the given name.
// This method deep copies the PV object so the caller may use the returned
// volume.Spec object without worrying about it mutating unexpectedly.
func (adc *attachDetachController) getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID) (*volume.Spec, error) {
pv, err := adc.pvLister.Get(name)
if err != nil {
return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
}
if pv.Spec.ClaimRef == nil {
return nil, fmt.Errorf(
"found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
name)
}
if pv.Spec.ClaimRef.UID != expectedClaimUID {
return nil, fmt.Errorf(
"found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
name,
pv.Spec.ClaimRef.UID,
expectedClaimUID)
}
// Do not return the object from the informer, since the store is shared it
// may be mutated by another consumer.
clonedPVObj, err := api.Scheme.DeepCopy(pv)
if err != nil || clonedPVObj == nil {
return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err)
}
clonedPV, ok := clonedPVObj.(*v1.PersistentVolume)
if !ok {
return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv)
}
return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
}
// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -14,9 +15,12 @@ go_library(
deps = [
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
@ -35,3 +39,21 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["desired_state_of_world_populator_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

View File

@ -25,17 +25,20 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kcache "k8s.io/client-go/tools/cache"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
// desired state of th world still exist, if not, it removes them.
// TODO: it also loops through the list of active pods and ensures that
// desired state of the world still exist, if not, it removes them.
// It also loops through the list of active pods and ensures that
// each one exists in the desired state of the world cache
// if it has volumes.
type DesiredStateOfWorldPopulator interface {
@ -50,19 +53,32 @@ type DesiredStateOfWorldPopulator interface {
// desiredStateOfWorld - the cache to populate
func NewDesiredStateOfWorldPopulator(
loopSleepDuration time.Duration,
listPodsRetryDuration time.Duration,
podLister corelisters.PodLister,
desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator {
desiredStateOfWorld cache.DesiredStateOfWorld,
volumePluginMgr *volume.VolumePluginMgr,
pvcLister corelisters.PersistentVolumeClaimLister,
pvLister corelisters.PersistentVolumeLister) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
loopSleepDuration: loopSleepDuration,
podLister: podLister,
desiredStateOfWorld: desiredStateOfWorld,
loopSleepDuration: loopSleepDuration,
listPodsRetryDuration: listPodsRetryDuration,
podLister: podLister,
desiredStateOfWorld: desiredStateOfWorld,
volumePluginMgr: volumePluginMgr,
pvcLister: pvcLister,
pvLister: pvLister,
}
}
type desiredStateOfWorldPopulator struct {
loopSleepDuration time.Duration
podLister corelisters.PodLister
desiredStateOfWorld cache.DesiredStateOfWorld
loopSleepDuration time.Duration
podLister corelisters.PodLister
desiredStateOfWorld cache.DesiredStateOfWorld
volumePluginMgr *volume.VolumePluginMgr
pvcLister corelisters.PersistentVolumeClaimLister
pvLister corelisters.PersistentVolumeLister
listPodsRetryDuration time.Duration
timeOfLastListPods time.Time
}
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
@ -72,6 +88,18 @@ func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndRemoveDeletedPods()
// findAndAddActivePods is called periodically, independently of the main
// populator loop.
if time.Since(dswp.timeOfLastListPods) < dswp.listPodsRetryDuration {
glog.V(5).Infof(
"Skipping findAndAddActivePods(). Not permitted until %v (listPodsRetryDuration %v).",
dswp.timeOfLastListPods.Add(dswp.listPodsRetryDuration),
dswp.listPodsRetryDuration)
return
}
dswp.findAndAddActivePods()
}
}
@ -113,3 +141,23 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
}
}
func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
pods, err := dswp.podLister.List(labels.Everything())
if err != nil {
glog.Errorf("podLister List failed: %v", err)
return
}
dswp.timeOfLastListPods = time.Now()
for _, pod := range pods {
if volumehelper.IsPodTerminated(pod, pod.Status) {
// Do not add volumes for terminated pods
continue
}
util.ProcessPodVolumes(pod, true,
dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister)
}
}

View File

@ -0,0 +1,137 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package populator
import (
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakePodInformer := fakeInformerFactory.Core().V1().Pods()
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "dswp-test-pod",
UID: "dswp-test-pod-uid",
Namespace: "dswp-test",
},
Spec: v1.PodSpec{
NodeName: "dswp-test-host",
Volumes: []v1.Volume{
{
Name: "dswp-test-volume-name",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "dswp-test-fake-device",
},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPhase("Running"),
},
}
fakePodInformer.Informer().GetStore().Add(pod)
podName := volumehelper.GetUniquePodName(pod)
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
pvcLister := fakeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
pvLister := fakeInformerFactory.Core().V1().PersistentVolumes().Lister()
dswp := &desiredStateOfWorldPopulator{
loopSleepDuration: 100 * time.Millisecond,
listPodsRetryDuration: 3 * time.Second,
desiredStateOfWorld: fakesDSW,
volumePluginMgr: fakeVolumePluginMgr,
podLister: fakePodInformer.Lister(),
pvcLister: pvcLister,
pvLister: pvLister,
}
//add the given node to the list of nodes managed by dsw
dswp.desiredStateOfWorld.AddNode(k8stypes.NodeName(pod.Spec.NodeName))
dswp.findAndAddActivePods()
expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName)
//check if the given volume referenced by the pod is added to dsw
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
if !volumeExists {
t.Fatalf(
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
expectedVolumeName,
volumeExists)
}
//delete the pod and volume manually
dswp.desiredStateOfWorld.DeletePod(podName, expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
//check if the given volume referenced by the pod still exists in dsw
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
if volumeExists {
t.Fatalf(
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
expectedVolumeName,
volumeExists)
}
//add pod and volume again
dswp.findAndAddActivePods()
//check if the given volume referenced by the pod is added to dsw for the second time
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
if !volumeExists {
t.Fatalf(
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
expectedVolumeName,
volumeExists)
}
fakePodInformer.Informer().GetStore().Delete(pod)
dswp.findAndRemoveDeletedPods()
//check if the given volume referenced by the pod still exists in dsw
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
if volumeExists {
t.Fatalf(
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
expectedVolumeName,
volumeExists)
}
}

View File

@ -0,0 +1,37 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["util.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,252 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// CreateVolumeSpec creates and returns a mutatable volume.Spec object for the
// specified volume. It dereference any PVC to get PV objects, if needed.
func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
glog.V(10).Infof(
"Found PVC, ClaimName: %q/%q",
podNamespace,
pvcSource.ClaimName)
// If podVolume is a PVC, fetch the real PV behind the claim
pvName, pvcUID, err := getPVCFromCacheExtractPV(
podNamespace, pvcSource.ClaimName, pvcLister)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}
glog.V(10).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
podNamespace,
pvcSource.ClaimName,
pvcUID,
pvName)
// Fetch actual PV object
volumeSpec, err := getPVSpecFromCache(
pvName, pvcSource.ReadOnly, pvcUID, pvLister)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}
glog.V(10).Infof(
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name,
pvName,
podNamespace,
pvcSource.ClaimName,
pvcUID)
return volumeSpec, nil
}
// Do not return the original volume object, since it's from the shared
// informer it may be mutated by another consumer.
clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume)
if err != nil || clonedPodVolumeObj == nil {
return nil, fmt.Errorf(
"failed to deep copy %q volume object. err=%v", podVolume.Name, err)
}
clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume)
if !ok {
return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj)
}
return volume.NewSpecFromVolume(clonedPodVolume), nil
}
// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
// name from the shared internal PVC store extracts the name of the PV it is
// pointing to and returns it.
// This method returns an error if a PVC object does not exist in the cache
// with the given namespace/name.
// This method returns an error if the PVC object's phase is not "Bound".
func getPVCFromCacheExtractPV(namespace string, name string, pvcLister corelisters.PersistentVolumeClaimLister) (string, types.UID, error) {
pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
}
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
return "", "", fmt.Errorf(
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
namespace,
name,
pvc.Status.Phase,
pvc.Spec.VolumeName)
}
return pvc.Spec.VolumeName, pvc.UID, nil
}
// getPVSpecFromCache fetches the PV object with the given name from the shared
// internal PV store and returns a volume.Spec representing it.
// This method returns an error if a PV object does not exist in the cache with
// the given name.
// This method deep copies the PV object so the caller may use the returned
// volume.Spec object without worrying about it mutating unexpectedly.
func getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
pv, err := pvLister.Get(name)
if err != nil {
return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
}
if pv.Spec.ClaimRef == nil {
return nil, fmt.Errorf(
"found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
name)
}
if pv.Spec.ClaimRef.UID != expectedClaimUID {
return nil, fmt.Errorf(
"found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
name,
pv.Spec.ClaimRef.UID,
expectedClaimUID)
}
// Do not return the object from the informer, since the store is shared it
// may be mutated by another consumer.
clonedPVObj, err := api.Scheme.DeepCopy(pv)
if err != nil || clonedPVObj == nil {
return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err)
}
clonedPV, ok := clonedPVObj.(*v1.PersistentVolume)
if !ok {
return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv)
}
return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
}
// ProcessPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world if addVolumes is true, otherwise it removes them.
func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) {
if pod == nil {
return
}
if len(pod.Spec.Volumes) <= 0 {
glog.V(10).Infof("Skipping processing of pod %q/%q: it has no volumes.",
pod.Namespace,
pod.Name)
return
}
nodeName := types.NodeName(pod.Spec.NodeName)
if nodeName == "" {
glog.V(10).Infof(
"Skipping processing of pod %q/%q: it is not scheduled to a node.",
pod.Namespace,
pod.Name)
return
} else if !desiredStateOfWorld.NodeExists(nodeName) {
// If the node the pod is scheduled to does not exist in the desired
// state of the world data structure, that indicates the node is not
// yet managed by the controller. Therefore, ignore the pod.
glog.V(10).Infof(
"Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
pod.Namespace,
pod.Name,
nodeName)
return
}
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, pvcLister, pvLister)
if err != nil {
glog.V(10).Infof(
"Error processing volume %q for pod %q/%q: %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
attachableVolumePlugin, err :=
volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
glog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
uniquePodName := volumehelper.GetUniquePodName(pod)
if addVolumes {
// Add volume to desired state of world
_, err := desiredStateOfWorld.AddPod(
uniquePodName, pod, volumeSpec, nodeName)
if err != nil {
glog.V(10).Infof(
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
}
} else {
// Remove volume from desired state of world
uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
glog.V(10).Infof(
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
desiredStateOfWorld.DeletePod(
uniquePodName, uniqueVolumeName, nodeName)
}
}
return
}

View File

@ -143,18 +143,7 @@ func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool {
if !found {
podStatus = pod.Status
}
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
}
// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []v1.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
return volumehelper.IsPodTerminated(pod, podStatus)
}
// Iterate through all pods and add to desired state of world if they don't

View File

@ -87,3 +87,19 @@ func GetUniqueVolumeNameFromSpec(
volumeName),
nil
}
// IsPodTerminated checks if pod is terminated
func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
}
// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []v1.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}

View File

@ -222,3 +222,89 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
}
return testClient, ctrl, informers
}
// Via integration test we can verify that if pod add
// event is somehow missed by AttachDetach controller - it still
// gets added by Desired State of World populator.
func TestPodAddedByDswp(t *testing.T) {
_, server := framework.RunAMaster(nil)
defer server.Close()
namespaceName := "test-pod-deletion"
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-sandbox",
Annotations: map[string]string{
volumehelper.ControllerManagedAttachAnnotation: "true",
},
},
}
ns := framework.CreateTestingNamespace(namespaceName, server, t)
defer framework.DeleteTestingNamespace(ns, server, t)
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
pod := fakePodWithVol(namespaceName)
podStopCh := make(chan struct{})
if _, err := testClient.Core().Nodes().Create(node); err != nil {
t.Fatalf("Failed to created node : %v", err)
}
go informers.Core().V1().Nodes().Informer().Run(podStopCh)
if _, err := testClient.Core().Pods(ns.Name).Create(pod); err != nil {
t.Errorf("Failed to create pod : %v", err)
}
podInformer := informers.Core().V1().Pods().Informer()
go podInformer.Run(podStopCh)
// start controller loop
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
go ctrl.Run(stopCh)
waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
}
_, _, err = podInformer.GetStore().GetByKey(podKey)
if err != nil {
t.Fatalf("Pod not found in Pod Informer cache : %v", err)
}
waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
// let's stop pod events from getting triggered
close(podStopCh)
podObj, err := api.Scheme.DeepCopy(pod)
if err != nil {
t.Fatalf("Error copying pod : %v", err)
}
podNew, ok := podObj.(*v1.Pod)
if !ok {
t.Fatalf("Error converting pod : %v", err)
}
newPodName := "newFakepod"
podNew.SetName(newPodName)
err = podInformer.GetStore().Add(podNew)
if err != nil {
t.Fatalf("Error adding pod : %v", err)
}
waitToObservePods(t, podInformer, 2)
// the findAndAddActivePods loop turns every 3 minute
time.Sleep(200 * time.Second)
podsToAdd := ctrl.GetDesiredStateOfWorld().GetPodToAdd()
if len(podsToAdd) != 2 {
t.Fatalf("DSW should have two pods")
}
close(stopCh)
}