sharing a common pod pvc indexer among volume controllers

This commit is contained in:
Yecheng Fu 2020-05-29 20:29:25 +08:00
parent eaf2f54bba
commit 8422044f17
8 changed files with 126 additions and 95 deletions

View File

@ -538,12 +538,16 @@ func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool,
}
func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
go pvcprotection.NewPVCProtectionController(
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
).Run(1, ctx.Stop)
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)
}
go pvcProtectionController.Run(1, ctx.Stop)
return nil, true, nil
}

View File

@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -198,11 +199,8 @@ func NewAttachDetachController(
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
err := adc.podIndexer.AddIndexers(kcache.Indexers{
pvcKeyIndex: indexByPVCKey,
})
if err != nil {
klog.Warningf("adding indexer got %v", err)
if err := common.AddIndexerIfNotPresent(adc.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
}
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
@ -223,30 +221,6 @@ func NewAttachDetachController(
return adc, nil
}
const (
pvcKeyIndex string = "pvcKey"
)
// indexByPVCKey returns PVC keys for given pod. Note that the index is only
// used for attaching, so we are only interested in active pods with nodeName
// set.
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
type attachDetachController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
@ -638,7 +612,7 @@ func (adc *attachDetachController) syncPVCByKey(key string) error {
return nil
}
objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
objs, err := adc.podIndexer.ByIndex(common.PodPVCIndex, key)
if err != nil {
return err
}
@ -647,6 +621,10 @@ func (adc *attachDetachController) syncPVCByKey(key string) error {
if !ok {
continue
}
// we are only interested in active pods with nodeName set
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
continue
}
volumeActionFlag := util.DetermineVolumeAction(
pod,
adc.desiredStateOfWorld,

View File

@ -159,6 +159,28 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
podInformer := informerFactory.Core().V1().Pods().Informer()
var podsNum, extraPodsNum, nodesNum, i int
// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,
false,
1*time.Second,
DefaultTimerConfig)
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
stopCh := make(chan struct{})
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
@ -240,28 +262,6 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
i++
}
// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,
false,
1*time.Second,
DefaultTimerConfig)
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
// Populate ASW
err = adc.populateActualStateOfWorld()
if err != nil {

View File

@ -0,0 +1,53 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
// PodPVCIndex is the lookup name for the index function, which is to index by pod pvcs.
PodPVCIndex = "pod-pvc-index"
)
// PodPVCIndexFunc returns PVC keys for given pod
func PodPVCIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
// AddIndexerIfNotPresent adds the index function with the name into the cache indexer if not present
func AddIndexerIfNotPresent(indexer cache.Indexer, indexName string, indexFunc cache.IndexFunc) error {
indexers := indexer.GetIndexers()
if _, ok := indexers[indexName]; ok {
return nil
}
return indexer.AddIndexers(cache.Indexers{indexName: indexFunc})
}

View File

@ -40,6 +40,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
@ -1323,7 +1324,7 @@ func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVo
func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod, error) {
pods := []*v1.Pod{}
objs, err := ctrl.podIndexer.ByIndex(pvcKeyIndex, key)
objs, err := ctrl.podIndexer.ByIndex(common.PodPVCIndex, key)
if err != nil {
return pods, err
}
@ -1337,7 +1338,7 @@ func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod,
return pods, err
}
// isVolumeUsed returns list of pods that use given PV.
// isVolumeUsed returns list of active pods that use given PV.
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
if pv.Spec.ClaimRef == nil {
return nil, false, nil
@ -1349,12 +1350,15 @@ func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([
return nil, false, fmt.Errorf("error finding pods by pvc %q: %s", pvcKey, err)
}
for _, pod := range pods {
if util.IsPodTerminated(pod, pod.Status) {
continue
}
podNames.Insert(pod.Namespace + "/" + pod.Name)
}
return podNames.List(), podNames.Len() != 0, nil
}
// findNonScheduledPodsByPVC returns list of non-scheduled pods that reference given PVC.
// findNonScheduledPodsByPVC returns list of non-scheduled active pods that reference given PVC.
func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.PersistentVolumeClaim) ([]string, error) {
pvcKey := fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)
pods, err := ctrl.findPodsByPVCKey(pvcKey)
@ -1363,6 +1367,9 @@ func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.Persis
}
podNames := []string{}
for _, pod := range pods {
if util.IsPodTerminated(pod, pod.Status) {
continue
}
if len(pod.Spec.NodeName) == 0 {
podNames = append(podNames, pod.Name)
}

View File

@ -41,12 +41,12 @@ import (
cloudprovider "k8s.io/cloud-provider"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/klog/v2"
)
@ -134,10 +134,8 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
if err := controller.podIndexer.AddIndexers(cache.Indexers{
pvcKeyIndex: indexByPVCKey,
}); err != nil {
return nil, fmt.Errorf("Could not initialize PersistentVolume Controller: %v", err)
if err := common.AddIndexerIfNotPresent(controller.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
}
csiTranslator := csitrans.New()
@ -569,28 +567,6 @@ func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.Persistent
// Stateless functions
const (
pvcKeyIndex string = "pvc-key-index"
)
// indexByPVCKey returns PVC keys for given pod
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if util.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string {
bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted)
boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)

View File

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -34,6 +33,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
"k8s.io/kubernetes/pkg/util/slice"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -49,6 +49,7 @@ type Controller struct {
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podIndexer cache.Indexer
queue workqueue.RateLimitingInterface
@ -57,7 +58,7 @@ type Controller struct {
}
// NewPVCProtectionController returns a new instance of PVCProtectionController.
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) *Controller {
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) (*Controller, error) {
e := &Controller{
client: cl,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"),
@ -78,6 +79,10 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI
e.podLister = podInformer.Lister()
e.podListerSynced = podInformer.Informer().HasSynced
e.podIndexer = podInformer.Informer().GetIndexer()
if err := common.AddIndexerIfNotPresent(e.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
return nil, fmt.Errorf("Could not initialize pvc protection controller: %v", err)
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e.podAddedDeletedUpdated(nil, obj, false)
@ -90,7 +95,7 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI
},
})
return e
return e, nil
}
// Run runs the controller goroutines.
@ -231,15 +236,20 @@ func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name)
pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
objs, err := c.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
if err != nil {
return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error())
}
for _, pod := range pods {
if podUsesPVC(pod, pvc.Name) {
return true, nil
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
if pod.Spec.NodeName == "" {
continue
}
// found a pod using this PVC
return true, nil
}
klog.V(4).Infof("No Pod using PVC %s/%s was found in the Informer's cache", pvc.Namespace, pvc.Name)

View File

@ -429,6 +429,12 @@ func TestPVCProtectionController(t *testing.T) {
pvcInformer := informers.Core().V1().PersistentVolumeClaims()
podInformer := informers.Core().V1().Pods()
// Create the controller
ctrl, err := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Populate the informers with initial objects so the controller can
// Get() and List() it.
for _, obj := range informersObjs {
@ -447,9 +453,6 @@ func TestPVCProtectionController(t *testing.T) {
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn)
}
// Create the controller
ctrl := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled)
// Start the test by simulating an event
if test.updatedPVC != nil {
ctrl.pvcAddedUpdated(test.updatedPVC)