mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #66863 from cofyc/fix64549
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. attachdetach controller: attach volumes immediately when Pod's PVCs are bound **What this PR does / why we need it**: Let attachdetach controller to attach volumes immediately when Pod's PVCs are bound. Current attachdetach controller calls `util.ProcessPodVolume` to add pod volumes into `desiredStateOfWorld` on these events: - podAdd event - podUpdate event - podDelete event - periodical `desiredStateOfWorldPopulator.findAndAddActivePod` But if a pod is created with PVCs not bound, no volumes will be added into `desiredStateOfWorld` [because PVCs not bound](https://github.com/kubernetes/kubernetes/blob/v1.12.0-alpha.0/pkg/controller/volume/attachdetach/util/util.go#L99). When pv controller binds PVCs successfully, attachdetach controller will not add pod volumes immediately because it does not watch on PVC events. It will wait until a pod update event is triggered (normally will not happen because no new status will be reported by kubelet) or `desiredStateOfWorldPopulator.findAndAddActivePod` is called (maybe 0~3 minutes later, see [timer configs](https://github.com/kubernetes/kubernetes/blob/v1.12.0-alpha.0/pkg/controller/volume/attachdetach/attach_detach_controller.go)). In bad case, pod start time will be very long (~3 minutes + ~2 minutes (kubelet max exponential backoff)), for example: https://github.com/kubernetes/kubernetes/issues/64549#issuecomment-409440546. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #64549 **Special notes for your reviewer**: **Release note**: ```release-note attachdetach controller attaches volumes immediately when Pod's PVCs are bound ```
This commit is contained in:
commit
cd786bda64
@ -26,9 +26,11 @@ go_library(
|
|||||||
"//pkg/volume/util/volumepathhandler:go_default_library",
|
"//pkg/volume/util/volumepathhandler:go_default_library",
|
||||||
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
|
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||||
@ -36,6 +38,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -26,9 +26,11 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
authenticationv1 "k8s.io/api/authentication/v1"
|
authenticationv1 "k8s.io/api/authentication/v1"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
@ -36,6 +38,7 @@ import (
|
|||||||
corelisters "k8s.io/client-go/listers/core/v1"
|
corelisters "k8s.io/client-go/listers/core/v1"
|
||||||
kcache "k8s.io/client-go/tools/cache"
|
kcache "k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||||
@ -125,9 +128,11 @@ func NewAttachDetachController(
|
|||||||
pvsSynced: pvInformer.Informer().HasSynced,
|
pvsSynced: pvInformer.Informer().HasSynced,
|
||||||
podLister: podInformer.Lister(),
|
podLister: podInformer.Lister(),
|
||||||
podsSynced: podInformer.Informer().HasSynced,
|
podsSynced: podInformer.Informer().HasSynced,
|
||||||
|
podIndexer: podInformer.Informer().GetIndexer(),
|
||||||
nodeLister: nodeInformer.Lister(),
|
nodeLister: nodeInformer.Lister(),
|
||||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
|
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
||||||
@ -179,15 +184,54 @@ func NewAttachDetachController(
|
|||||||
DeleteFunc: adc.podDelete,
|
DeleteFunc: adc.podDelete,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// This custom indexer will index pods by its PVC keys. Then we don't need
|
||||||
|
// to iterate all pods every time to find pods which reference given PVC.
|
||||||
|
adc.podIndexer.AddIndexers(kcache.Indexers{
|
||||||
|
pvcKeyIndex: indexByPVCKey,
|
||||||
|
})
|
||||||
|
|
||||||
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: adc.nodeAdd,
|
AddFunc: adc.nodeAdd,
|
||||||
UpdateFunc: adc.nodeUpdate,
|
UpdateFunc: adc.nodeUpdate,
|
||||||
DeleteFunc: adc.nodeDelete,
|
DeleteFunc: adc.nodeDelete,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
adc.enqueuePVC(obj)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(old, new interface{}) {
|
||||||
|
adc.enqueuePVC(new)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
return adc, nil
|
return adc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
pvcKeyIndex string = "pvcKey"
|
||||||
|
)
|
||||||
|
|
||||||
|
// indexByPVCKey returns PVC keys for given pod. Note that the index is only
|
||||||
|
// used for attaching, so we are only interested in active pods with nodeName
|
||||||
|
// set.
|
||||||
|
func indexByPVCKey(obj interface{}) ([]string, error) {
|
||||||
|
pod, ok := obj.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
keys := []string{}
|
||||||
|
for _, podVolume := range pod.Spec.Volumes {
|
||||||
|
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
|
||||||
|
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
|
||||||
type attachDetachController struct {
|
type attachDetachController struct {
|
||||||
// kubeClient is the kube API client used by volumehost to communicate with
|
// kubeClient is the kube API client used by volumehost to communicate with
|
||||||
// the API server.
|
// the API server.
|
||||||
@ -207,6 +251,7 @@ type attachDetachController struct {
|
|||||||
|
|
||||||
podLister corelisters.PodLister
|
podLister corelisters.PodLister
|
||||||
podsSynced kcache.InformerSynced
|
podsSynced kcache.InformerSynced
|
||||||
|
podIndexer kcache.Indexer
|
||||||
|
|
||||||
nodeLister corelisters.NodeLister
|
nodeLister corelisters.NodeLister
|
||||||
nodesSynced kcache.InformerSynced
|
nodesSynced kcache.InformerSynced
|
||||||
@ -251,10 +296,14 @@ type attachDetachController struct {
|
|||||||
|
|
||||||
// recorder is used to record events in the API server
|
// recorder is used to record events in the API server
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
|
|
||||||
|
// pvcQueue is used to queue pvc objects
|
||||||
|
pvcQueue workqueue.RateLimitingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
|
defer adc.pvcQueue.ShutDown()
|
||||||
|
|
||||||
glog.Infof("Starting attach detach controller")
|
glog.Infof("Starting attach detach controller")
|
||||||
defer glog.Infof("Shutting down attach detach controller")
|
defer glog.Infof("Shutting down attach detach controller")
|
||||||
@ -273,6 +322,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
|||||||
}
|
}
|
||||||
go adc.reconciler.Run(stopCh)
|
go adc.reconciler.Run(stopCh)
|
||||||
go adc.desiredStateOfWorldPopulator.Run(stopCh)
|
go adc.desiredStateOfWorldPopulator.Run(stopCh)
|
||||||
|
go wait.Until(adc.pvcWorker, time.Second, stopCh)
|
||||||
metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr)
|
metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr)
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
@ -486,6 +536,83 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
|
|||||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (adc *attachDetachController) enqueuePVC(obj interface{}) {
|
||||||
|
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
adc.pvcQueue.Add(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pvcWorker processes items from pvcQueue
|
||||||
|
func (adc *attachDetachController) pvcWorker() {
|
||||||
|
for adc.processNextItem() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (adc *attachDetachController) processNextItem() bool {
|
||||||
|
keyObj, shutdown := adc.pvcQueue.Get()
|
||||||
|
if shutdown {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer adc.pvcQueue.Done(keyObj)
|
||||||
|
|
||||||
|
if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
|
||||||
|
// Rather than wait for a full resync, re-add the key to the
|
||||||
|
// queue to be processed.
|
||||||
|
adc.pvcQueue.AddRateLimited(keyObj)
|
||||||
|
runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, if no error occurs we Forget this item so it does not
|
||||||
|
// get queued again until another change happens.
|
||||||
|
adc.pvcQueue.Forget(keyObj)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (adc *attachDetachController) syncPVCByKey(key string) error {
|
||||||
|
glog.V(5).Infof("syncPVCByKey[%s]", key)
|
||||||
|
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
glog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
|
||||||
|
// Skip unbound PVCs.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, obj := range objs {
|
||||||
|
pod, ok := obj.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
volumeActionFlag := util.DetermineVolumeAction(
|
||||||
|
pod,
|
||||||
|
adc.desiredStateOfWorld,
|
||||||
|
true /* default volume action */)
|
||||||
|
|
||||||
|
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
|
||||||
|
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// processVolumesInUse processes the list of volumes marked as "in-use"
|
// processVolumesInUse processes the list of volumes marked as "in-use"
|
||||||
// according to the specified Node's Status.VolumesInUse and updates the
|
// according to the specified Node's Status.VolumesInUse and updates the
|
||||||
// corresponding volume in the actual state of the world to indicate that it is
|
// corresponding volume in the actual state of the world to indicate that it is
|
||||||
|
@ -20,6 +20,7 @@ go_test(
|
|||||||
"//pkg/controller/volume/attachdetach:go_default_library",
|
"//pkg/controller/volume/attachdetach:go_default_library",
|
||||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||||
"//pkg/controller/volume/persistentvolume:go_default_library",
|
"//pkg/controller/volume/persistentvolume:go_default_library",
|
||||||
|
"//pkg/controller/volume/persistentvolume/options:go_default_library",
|
||||||
"//pkg/volume:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
"//pkg/volume/testing:go_default_library",
|
"//pkg/volume/testing:go_default_library",
|
||||||
"//pkg/volume/util:go_default_library",
|
"//pkg/volume/util:go_default_library",
|
||||||
|
@ -17,11 +17,13 @@ limitations under the License.
|
|||||||
package volume
|
package volume
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -32,6 +34,8 @@ import (
|
|||||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
||||||
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||||
|
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
"k8s.io/kubernetes/pkg/volume/util"
|
"k8s.io/kubernetes/pkg/volume/util"
|
||||||
@ -73,8 +77,68 @@ func fakePodWithVol(namespace string) *v1.Pod {
|
|||||||
return fakePod
|
return fakePod
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fakePodWithPVC(name, pvcName, namespace string) (*v1.Pod, *v1.PersistentVolumeClaim) {
|
||||||
|
fakePod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "fake-container",
|
||||||
|
Image: "nginx",
|
||||||
|
VolumeMounts: []v1.VolumeMount{
|
||||||
|
{
|
||||||
|
Name: "fake-mount",
|
||||||
|
MountPath: "/var/www/html",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
Name: "fake-mount",
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: pvcName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NodeName: "node-sandbox",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
class := "fake-sc"
|
||||||
|
fakePVC := &v1.PersistentVolumeClaim{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: pvcName,
|
||||||
|
},
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{
|
||||||
|
AccessModes: []v1.PersistentVolumeAccessMode{
|
||||||
|
v1.ReadWriteOnce,
|
||||||
|
},
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StorageClassName: &class,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return fakePod, fakePVC
|
||||||
|
}
|
||||||
|
|
||||||
type podCountFunc func(int) bool
|
type podCountFunc func(int) bool
|
||||||
|
|
||||||
|
var defaultTimerConfig = attachdetach.TimerConfig{
|
||||||
|
ReconcilerLoopPeriod: 100 * time.Millisecond,
|
||||||
|
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
|
||||||
|
DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Second,
|
||||||
|
DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
// Via integration test we can verify that if pod delete
|
// Via integration test we can verify that if pod delete
|
||||||
// event is somehow missed by AttachDetach controller - it still
|
// event is somehow missed by AttachDetach controller - it still
|
||||||
// gets cleaned up by Desired State of World populator.
|
// gets cleaned up by Desired State of World populator.
|
||||||
@ -94,7 +158,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
|
|||||||
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
|
testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
pod := fakePodWithVol(namespaceName)
|
pod := fakePodWithVol(namespaceName)
|
||||||
podStopCh := make(chan struct{})
|
podStopCh := make(chan struct{})
|
||||||
|
|
||||||
@ -160,7 +224,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
|
|||||||
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
|
testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
|
|
||||||
pod := fakePodWithVol(namespaceName)
|
pod := fakePodWithVol(namespaceName)
|
||||||
podStopCh := make(chan struct{})
|
podStopCh := make(chan struct{})
|
||||||
@ -228,7 +292,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
|
|||||||
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
|
testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
|
|
||||||
pod := fakePodWithVol(namespaceName)
|
pod := fakePodWithVol(namespaceName)
|
||||||
podStopCh := make(chan struct{})
|
podStopCh := make(chan struct{})
|
||||||
@ -320,7 +384,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration) (*clientset.Clientset, attachdetach.AttachDetachController, informers.SharedInformerFactory) {
|
func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, informers.SharedInformerFactory) {
|
||||||
config := restclient.Config{
|
config := restclient.Config{
|
||||||
Host: server.URL,
|
Host: server.URL,
|
||||||
ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
|
ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
|
||||||
@ -346,12 +410,6 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
|||||||
plugins := []volume.VolumePlugin{plugin}
|
plugins := []volume.VolumePlugin{plugin}
|
||||||
cloud := &fakecloud.FakeCloud{}
|
cloud := &fakecloud.FakeCloud{}
|
||||||
informers := informers.NewSharedInformerFactory(testClient, resyncPeriod)
|
informers := informers.NewSharedInformerFactory(testClient, resyncPeriod)
|
||||||
timers := attachdetach.TimerConfig{
|
|
||||||
ReconcilerLoopPeriod: 100 * time.Millisecond,
|
|
||||||
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
|
|
||||||
DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Second,
|
|
||||||
DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second,
|
|
||||||
}
|
|
||||||
ctrl, err := attachdetach.NewAttachDetachController(
|
ctrl, err := attachdetach.NewAttachDetachController(
|
||||||
testClient,
|
testClient,
|
||||||
informers.Core().V1().Pods(),
|
informers.Core().V1().Pods(),
|
||||||
@ -368,7 +426,27 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error creating AttachDetach : %v", err)
|
t.Fatalf("Error creating AttachDetach : %v", err)
|
||||||
}
|
}
|
||||||
return testClient, ctrl, informers
|
|
||||||
|
// create pv controller
|
||||||
|
controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions()
|
||||||
|
params := persistentvolume.ControllerParameters{
|
||||||
|
KubeClient: testClient,
|
||||||
|
SyncPeriod: controllerOptions.PVClaimBinderSyncPeriod,
|
||||||
|
VolumePlugins: plugins,
|
||||||
|
Cloud: nil,
|
||||||
|
ClusterName: "volume-test-cluster",
|
||||||
|
VolumeInformer: informers.Core().V1().PersistentVolumes(),
|
||||||
|
ClaimInformer: informers.Core().V1().PersistentVolumeClaims(),
|
||||||
|
ClassInformer: informers.Storage().V1().StorageClasses(),
|
||||||
|
PodInformer: informers.Core().V1().Pods(),
|
||||||
|
NodeInformer: informers.Core().V1().Nodes(),
|
||||||
|
EnableDynamicProvisioning: false,
|
||||||
|
}
|
||||||
|
pvCtrl, err := persistentvolume.NewController(params)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create PV controller: %v", err)
|
||||||
|
}
|
||||||
|
return testClient, ctrl, pvCtrl, informers
|
||||||
}
|
}
|
||||||
|
|
||||||
// Via integration test we can verify that if pod add
|
// Via integration test we can verify that if pod add
|
||||||
@ -391,7 +469,7 @@ func TestPodAddedByDswp(t *testing.T) {
|
|||||||
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
|
testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
|
|
||||||
pod := fakePodWithVol(namespaceName)
|
pod := fakePodWithVol(namespaceName)
|
||||||
podStopCh := make(chan struct{})
|
podStopCh := make(chan struct{})
|
||||||
@ -446,3 +524,91 @@ func TestPodAddedByDswp(t *testing.T) {
|
|||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPVCBoundWithADC(t *testing.T) {
|
||||||
|
_, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
|
||||||
|
defer closeFn()
|
||||||
|
namespaceName := "test-pod-deletion"
|
||||||
|
|
||||||
|
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||||
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
|
testClient, ctrl, pvCtrl, informers := createAdClients(ns, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
|
||||||
|
ReconcilerLoopPeriod: 100 * time.Millisecond,
|
||||||
|
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
|
||||||
|
DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
|
||||||
|
// Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test.
|
||||||
|
DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour,
|
||||||
|
})
|
||||||
|
|
||||||
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "node-sandbox",
|
||||||
|
Annotations: map[string]string{
|
||||||
|
util.ControllerManagedAttachAnnotation: "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if _, err := testClient.Core().Nodes().Create(node); err != nil {
|
||||||
|
t.Fatalf("Failed to created node : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pods with pvc not bound
|
||||||
|
pvcs := []*v1.PersistentVolumeClaim{}
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
pod, pvc := fakePodWithPVC(fmt.Sprintf("fakepod-pvcnotbound-%d", i), fmt.Sprintf("fakepvc-%d", i), namespaceName)
|
||||||
|
if _, err := testClient.Core().Pods(pod.Namespace).Create(pod); err != nil {
|
||||||
|
t.Errorf("Failed to create pod : %v", err)
|
||||||
|
}
|
||||||
|
if _, err := testClient.Core().PersistentVolumeClaims(pvc.Namespace).Create(pvc); err != nil {
|
||||||
|
t.Errorf("Failed to create pvc : %v", err)
|
||||||
|
}
|
||||||
|
pvcs = append(pvcs, pvc)
|
||||||
|
}
|
||||||
|
// pod with no pvc
|
||||||
|
podNew := fakePodWithVol(namespaceName)
|
||||||
|
podNew.SetName("fakepod")
|
||||||
|
if _, err := testClient.Core().Pods(podNew.Namespace).Create(podNew); err != nil {
|
||||||
|
t.Errorf("Failed to create pod : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// start controller loop
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
informers.Start(stopCh)
|
||||||
|
informers.WaitForCacheSync(stopCh)
|
||||||
|
go ctrl.Run(stopCh)
|
||||||
|
go pvCtrl.Run(stopCh)
|
||||||
|
|
||||||
|
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4)
|
||||||
|
// Give attachdetach controller enough time to populate pods into DSWP.
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 1 pod in dsw", 1)
|
||||||
|
for _, pvc := range pvcs {
|
||||||
|
createPVForPVC(t, testClient, pvc)
|
||||||
|
}
|
||||||
|
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4)
|
||||||
|
close(stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create PV for PVC, pv controller will bind them together.
|
||||||
|
func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.PersistentVolumeClaim) {
|
||||||
|
pv := &v1.PersistentVolume{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("fakepv-%s", pvc.Name),
|
||||||
|
},
|
||||||
|
Spec: v1.PersistentVolumeSpec{
|
||||||
|
Capacity: pvc.Spec.Resources.Requests,
|
||||||
|
AccessModes: pvc.Spec.AccessModes,
|
||||||
|
PersistentVolumeSource: v1.PersistentVolumeSource{
|
||||||
|
HostPath: &v1.HostPathVolumeSource{
|
||||||
|
Path: "/var/www/html",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ClaimRef: &v1.ObjectReference{Name: pvc.Name, Namespace: pvc.Namespace},
|
||||||
|
StorageClassName: *pvc.Spec.StorageClassName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if _, err := testClient.Core().PersistentVolumes().Create(pv); err != nil {
|
||||||
|
t.Errorf("Failed to create pv : %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user