Merge pull request #91574 from cofyc/fix91436

share a common pod indexer among volume controllers
This commit is contained in:
Kubernetes Prow Robot 2020-06-10 22:42:56 -07:00 committed by GitHub
commit 1f299e7b99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 156 additions and 96 deletions

View File

@ -533,12 +533,16 @@ func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool,
}
func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
go pvcprotection.NewPVCProtectionController(
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
).Run(1, ctx.Stop)
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)
}
go pvcProtectionController.Run(1, ctx.Stop)
return nil, true, nil
}

View File

@ -135,6 +135,7 @@ filegroup(
"//pkg/controller/util/endpoint:all-srcs",
"//pkg/controller/util/node:all-srcs",
"//pkg/controller/volume/attachdetach:all-srcs",
"//pkg/controller/volume/common:all-srcs",
"//pkg/controller/volume/events:all-srcs",
"//pkg/controller/volume/expand:all-srcs",
"//pkg/controller/volume/persistentvolume:all-srcs",

View File

@ -17,6 +17,7 @@ go_library(
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/controller/volume/common:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csimigration:go_default_library",

View File

@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -198,11 +199,8 @@ func NewAttachDetachController(
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
err := adc.podIndexer.AddIndexers(kcache.Indexers{
pvcKeyIndex: indexByPVCKey,
})
if err != nil {
klog.Warningf("adding indexer got %v", err)
if err := common.AddIndexerIfNotPresent(adc.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
}
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
@ -223,30 +221,6 @@ func NewAttachDetachController(
return adc, nil
}
const (
pvcKeyIndex string = "pvcKey"
)
// indexByPVCKey returns PVC keys for given pod. Note that the index is only
// used for attaching, so we are only interested in active pods with nodeName
// set.
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
type attachDetachController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
@ -638,7 +612,7 @@ func (adc *attachDetachController) syncPVCByKey(key string) error {
return nil
}
objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
objs, err := adc.podIndexer.ByIndex(common.PodPVCIndex, key)
if err != nil {
return err
}
@ -647,6 +621,10 @@ func (adc *attachDetachController) syncPVCByKey(key string) error {
if !ok {
continue
}
// we are only interested in active pods with nodeName set
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
continue
}
volumeActionFlag := util.DetermineVolumeAction(
pod,
adc.desiredStateOfWorld,

View File

@ -159,6 +159,28 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
podInformer := informerFactory.Core().V1().Pods().Informer()
var podsNum, extraPodsNum, nodesNum, i int
// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,
false,
1*time.Second,
DefaultTimerConfig)
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
stopCh := make(chan struct{})
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
@ -240,28 +262,6 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
i++
}
// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,
false,
1*time.Second,
DefaultTimerConfig)
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
// Populate ASW
err = adc.populateActualStateOfWorld()
if err != nil {

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["common.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/common",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,53 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
// PodPVCIndex is the lookup name for the index function, which is to index by pod pvcs.
PodPVCIndex = "pod-pvc-index"
)
// PodPVCIndexFunc returns PVC keys for given pod
func PodPVCIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
// AddIndexerIfNotPresent adds the index function with the name into the cache indexer if not present
func AddIndexerIfNotPresent(indexer cache.Indexer, indexName string, indexFunc cache.IndexFunc) error {
indexers := indexer.GetIndexers()
if _, ok := indexers[indexName]; ok {
return nil
}
return indexer.AddIndexers(cache.Indexers{indexName: indexFunc})
}

View File

@ -18,6 +18,7 @@ go_library(
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/common:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/persistentvolume/metrics:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",

View File

@ -40,6 +40,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
@ -1323,7 +1324,7 @@ func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVo
func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod, error) {
pods := []*v1.Pod{}
objs, err := ctrl.podIndexer.ByIndex(pvcKeyIndex, key)
objs, err := ctrl.podIndexer.ByIndex(common.PodPVCIndex, key)
if err != nil {
return pods, err
}
@ -1337,7 +1338,7 @@ func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod,
return pods, err
}
// isVolumeUsed returns list of pods that use given PV.
// isVolumeUsed returns list of active pods that use given PV.
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
if pv.Spec.ClaimRef == nil {
return nil, false, nil
@ -1349,12 +1350,15 @@ func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([
return nil, false, fmt.Errorf("error finding pods by pvc %q: %s", pvcKey, err)
}
for _, pod := range pods {
if util.IsPodTerminated(pod, pod.Status) {
continue
}
podNames.Insert(pod.Namespace + "/" + pod.Name)
}
return podNames.List(), podNames.Len() != 0, nil
}
// findNonScheduledPodsByPVC returns list of non-scheduled pods that reference given PVC.
// findNonScheduledPodsByPVC returns list of non-scheduled active pods that reference given PVC.
func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.PersistentVolumeClaim) ([]string, error) {
pvcKey := fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)
pods, err := ctrl.findPodsByPVCKey(pvcKey)
@ -1363,6 +1367,9 @@ func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.Persis
}
podNames := []string{}
for _, pod := range pods {
if util.IsPodTerminated(pod, pod.Status) {
continue
}
if len(pod.Spec.NodeName) == 0 {
podNames = append(podNames, pod.Name)
}

View File

@ -41,12 +41,12 @@ import (
cloudprovider "k8s.io/cloud-provider"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/klog/v2"
)
@ -134,10 +134,8 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
if err := controller.podIndexer.AddIndexers(cache.Indexers{
pvcKeyIndex: indexByPVCKey,
}); err != nil {
return nil, fmt.Errorf("Could not initialize PersistentVolume Controller: %v", err)
if err := common.AddIndexerIfNotPresent(controller.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
}
csiTranslator := csitrans.New()
@ -569,28 +567,6 @@ func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.Persistent
// Stateless functions
const (
pvcKeyIndex string = "pvc-key-index"
)
// indexByPVCKey returns PVC keys for given pod
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if util.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string {
bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted)
boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)

View File

@ -6,13 +6,13 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/volume/common:go_default_library",
"//pkg/controller/volume/protectionutil:go_default_library",
"//pkg/util/slice:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -34,6 +33,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
"k8s.io/kubernetes/pkg/util/slice"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -49,6 +49,7 @@ type Controller struct {
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podIndexer cache.Indexer
queue workqueue.RateLimitingInterface
@ -57,7 +58,7 @@ type Controller struct {
}
// NewPVCProtectionController returns a new instance of PVCProtectionController.
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) *Controller {
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) (*Controller, error) {
e := &Controller{
client: cl,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"),
@ -78,6 +79,10 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI
e.podLister = podInformer.Lister()
e.podListerSynced = podInformer.Informer().HasSynced
e.podIndexer = podInformer.Informer().GetIndexer()
if err := common.AddIndexerIfNotPresent(e.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
return nil, fmt.Errorf("Could not initialize pvc protection controller: %v", err)
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e.podAddedDeletedUpdated(nil, obj, false)
@ -90,7 +95,7 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI
},
})
return e
return e, nil
}
// Run runs the controller goroutines.
@ -231,15 +236,20 @@ func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name)
pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
objs, err := c.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
if err != nil {
return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error())
}
for _, pod := range pods {
if podUsesPVC(pod, pvc.Name) {
return true, nil
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
if pod.Spec.NodeName == "" {
continue
}
// found a pod using this PVC
return true, nil
}
klog.V(4).Infof("No Pod using PVC %s/%s was found in the Informer's cache", pvc.Namespace, pvc.Name)

View File

@ -429,6 +429,12 @@ func TestPVCProtectionController(t *testing.T) {
pvcInformer := informers.Core().V1().PersistentVolumeClaims()
podInformer := informers.Core().V1().Pods()
// Create the controller
ctrl, err := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Populate the informers with initial objects so the controller can
// Get() and List() it.
for _, obj := range informersObjs {
@ -447,9 +453,6 @@ func TestPVCProtectionController(t *testing.T) {
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn)
}
// Create the controller
ctrl := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled)
// Start the test by simulating an event
if test.updatedPVC != nil {
ctrl.pvcAddedUpdated(test.updatedPVC)