Wire contexts to Core controllers

This commit is contained in:
Mike Dame
2021-04-22 14:27:59 -04:00
parent 657412713b
commit 4960d0976a
61 changed files with 842 additions and 780 deletions

View File

@@ -45,7 +45,7 @@ import (
// Controller creates PVCs for ephemeral inline volumes in a pod spec.
type Controller interface {
Run(workers int, stopCh <-chan struct{})
Run(ctx context.Context, workers int)
}
type ephemeralController struct {
@@ -163,37 +163,37 @@ func (ec *ephemeralController) onPVCDelete(obj interface{}) {
}
}
func (ec *ephemeralController) Run(workers int, stopCh <-chan struct{}) {
func (ec *ephemeralController) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer ec.queue.ShutDown()
klog.Infof("Starting ephemeral volume controller")
defer klog.Infof("Shutting down ephemeral volume controller")
if !cache.WaitForNamedCacheSync("ephemeral", stopCh, ec.podSynced, ec.pvcsSynced) {
if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(ec.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (ec *ephemeralController) runWorker() {
for ec.processNextWorkItem() {
func (ec *ephemeralController) runWorker(ctx context.Context) {
for ec.processNextWorkItem(ctx) {
}
}
func (ec *ephemeralController) processNextWorkItem() bool {
func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool {
key, shutdown := ec.queue.Get()
if shutdown {
return false
}
defer ec.queue.Done(key)
err := ec.syncHandler(key.(string))
err := ec.syncHandler(ctx, key.(string))
if err == nil {
ec.queue.Forget(key)
return true
@@ -207,7 +207,7 @@ func (ec *ephemeralController) processNextWorkItem() bool {
// syncHandler is invoked for each pod which might need to be processed.
// If an error is returned from this function, the pod will be requeued.
func (ec *ephemeralController) syncHandler(key string) error {
func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
return err
@@ -229,7 +229,7 @@ func (ec *ephemeralController) syncHandler(key string) error {
}
for _, vol := range pod.Spec.Volumes {
if err := ec.handleVolume(pod, vol); err != nil {
if err := ec.handleVolume(ctx, pod, vol); err != nil {
ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err))
return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err)
}
@@ -239,7 +239,7 @@ func (ec *ephemeralController) syncHandler(key string) error {
}
// handleEphemeralVolume is invoked for each volume of a pod.
func (ec *ephemeralController) handleVolume(pod *v1.Pod, vol v1.Volume) error {
func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error {
klog.V(5).Infof("ephemeral: checking volume %s", vol.Name)
if vol.Ephemeral == nil {
return nil
@@ -280,7 +280,7 @@ func (ec *ephemeralController) handleVolume(pod *v1.Pod, vol v1.Volume) error {
Spec: vol.Ephemeral.VolumeClaimTemplate.Spec,
}
ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc()
_, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
_, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
if err != nil {
ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc()
return fmt.Errorf("create PVC %s: %v", pvcName, err)

View File

@@ -160,7 +160,7 @@ func TestSyncHandler(t *testing.T) {
informerFactory.WaitForCacheSync(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced)
err = ec.syncHandler(tc.podKey)
err = ec.syncHandler(context.TODO(), tc.podKey)
if err != nil && !tc.expectedError {
t.Fatalf("unexpected error while running handler: %v", err)
}

View File

@@ -60,7 +60,7 @@ const (
// ExpandController expands the pvs
type ExpandController interface {
Run(stopCh <-chan struct{})
Run(ctx context.Context)
}
// CSINameTranslator can get the CSI Driver name based on the in-tree plugin name
@@ -188,14 +188,14 @@ func (expc *expandController) enqueuePVC(obj interface{}) {
}
}
func (expc *expandController) processNextWorkItem() bool {
func (expc *expandController) processNextWorkItem(ctx context.Context) bool {
key, shutdown := expc.queue.Get()
if shutdown {
return false
}
defer expc.queue.Done(key)
err := expc.syncHandler(key.(string))
err := expc.syncHandler(ctx, key.(string))
if err == nil {
expc.queue.Forget(key)
return true
@@ -209,7 +209,7 @@ func (expc *expandController) processNextWorkItem() bool {
// syncHandler performs actual expansion of volume. If an error is returned
// from this function - PVC will be requeued for resizing.
func (expc *expandController) syncHandler(key string) error {
func (expc *expandController) syncHandler(ctx context.Context, key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
return err
@@ -223,7 +223,7 @@ func (expc *expandController) syncHandler(key string) error {
return err
}
pv, err := expc.getPersistentVolume(pvc)
pv, err := expc.getPersistentVolume(ctx, pvc)
if err != nil {
klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
return err
@@ -320,32 +320,32 @@ func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.Persi
}
// TODO make concurrency configurable (workers argument). previously, nestedpendingoperations spawned unlimited goroutines
func (expc *expandController) Run(stopCh <-chan struct{}) {
func (expc *expandController) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer expc.queue.ShutDown()
klog.Infof("Starting expand controller")
defer klog.Infof("Shutting down expand controller")
if !cache.WaitForNamedCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) {
if !cache.WaitForNamedCacheSync("expand", ctx.Done(), expc.pvcsSynced, expc.pvSynced) {
return
}
for i := 0; i < defaultWorkerCount; i++ {
go wait.Until(expc.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, expc.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (expc *expandController) runWorker() {
for expc.processNextWorkItem() {
func (expc *expandController) runWorker(ctx context.Context) {
for expc.processNextWorkItem(ctx) {
}
}
func (expc *expandController) getPersistentVolume(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
func (expc *expandController) getPersistentVolume(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
volumeName := pvc.Spec.VolumeName
pv, err := expc.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), volumeName, metav1.GetOptions{})
pv, err := expc.kubeClient.CoreV1().PersistentVolumes().Get(ctx, volumeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get PV %q: %v", volumeName, err)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package expand
import (
"context"
"encoding/json"
"fmt"
"reflect"
@@ -157,7 +158,7 @@ func TestSyncHandler(t *testing.T) {
return true, pvc, nil
})
err = expController.syncHandler(test.pvcKey)
err = expController.syncHandler(context.TODO(), test.pvcKey)
if err != nil && !test.hasError {
t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package persistentvolume
import (
"context"
"fmt"
"reflect"
"strings"
@@ -490,11 +491,11 @@ func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.Per
}
func testSyncClaim(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return ctrl.syncClaim(test.initialClaims[0])
return ctrl.syncClaim(context.TODO(), test.initialClaims[0])
}
func testSyncClaimError(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
err := ctrl.syncClaim(test.initialClaims[0])
err := ctrl.syncClaim(context.TODO(), test.initialClaims[0])
if err != nil {
return nil
@@ -503,7 +504,7 @@ func testSyncClaimError(ctrl *PersistentVolumeController, reactor *pvtesting.Vol
}
func testSyncVolume(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return ctrl.syncVolume(test.initialVolumes[0])
return ctrl.syncVolume(context.TODO(), test.initialVolumes[0])
}
type operationType string
@@ -797,7 +798,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
claim := obj.(*v1.PersistentVolumeClaim)
// Simulate "claim updated" event
ctrl.claims.Update(claim)
err = ctrl.syncClaim(claim)
err = ctrl.syncClaim(context.TODO(), claim)
if err != nil {
if err == pvtesting.ErrVersionConflict {
// Ignore version errors
@@ -814,7 +815,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
volume := obj.(*v1.PersistentVolume)
// Simulate "volume updated" event
ctrl.volumes.store.Update(volume)
err = ctrl.syncVolume(volume)
err = ctrl.syncVolume(context.TODO(), volume)
if err != nil {
if err == pvtesting.ErrVersionConflict {
// Ignore version errors

View File

@@ -17,6 +17,7 @@ limitations under the License.
package persistentvolume
import (
"context"
"errors"
"testing"
@@ -652,7 +653,7 @@ func TestDisablingDynamicProvisioner(t *testing.T) {
if err != nil {
t.Fatalf("Construct PersistentVolume controller failed: %v", err)
}
retVal := ctrl.provisionClaim(nil)
retVal := ctrl.provisionClaim(context.TODO(), nil)
if retVal != nil {
t.Errorf("Expected nil return but got %v", retVal)
}

View File

@@ -249,12 +249,12 @@ type PersistentVolumeController struct {
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
func (ctrl *PersistentVolumeController) syncClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
// Set correct "migrated-to" annotations on PVC and update in API server if
// necessary
newClaim, err := ctrl.updateClaimMigrationAnnotations(claim)
newClaim, err := ctrl.updateClaimMigrationAnnotations(ctx, claim)
if err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
@@ -263,7 +263,7 @@ func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClai
claim = newClaim
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
return ctrl.syncUnboundClaim(claim)
return ctrl.syncUnboundClaim(ctx, claim)
} else {
return ctrl.syncBoundClaim(claim)
}
@@ -330,7 +330,7 @@ func (ctrl *PersistentVolumeController) emitEventForUnboundDelayBindingClaim(cla
// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
// This is a new PVC that has not completed binding
// OBSERVATION: pvc is "Pending"
if claim.Spec.VolumeName == "" {
@@ -356,7 +356,7 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
return err
}
case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":
if err = ctrl.provisionClaim(claim); err != nil {
if err = ctrl.provisionClaim(ctx, claim); err != nil {
return err
}
return nil
@@ -539,12 +539,12 @@ func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolum
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {
func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))
// Set correct "migrated-to" annotations on PV and update in API server if
// necessary
newVolume, err := ctrl.updateVolumeMigrationAnnotations(volume)
newVolume, err := ctrl.updateVolumeMigrationAnnotations(ctx, volume)
if err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
@@ -1451,7 +1451,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu
// provisionClaim starts new asynchronous operation to provision a claim if
// provisioning is enabled.
func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error {
func (ctrl *PersistentVolumeController) provisionClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
if !ctrl.enableDynamicProvisioning {
return nil
}
@@ -1474,9 +1474,9 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
ctrl.operationTimestamps.AddIfNotExist(claimKey, ctrl.getProvisionerName(plugin, storageClass), "provision")
var err error
if plugin == nil {
_, err = ctrl.provisionClaimOperationExternal(claim, storageClass)
_, err = ctrl.provisionClaimOperationExternal(ctx, claim, storageClass)
} else {
_, err = ctrl.provisionClaimOperation(claim, plugin, storageClass)
_, err = ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass)
}
// if error happened, record an error count metric
// timestamp entry will remain in cache until a success binding has happened
@@ -1491,6 +1491,7 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(
ctx context.Context,
claim *v1.PersistentVolumeClaim,
plugin vol.ProvisionableVolumePlugin,
storageClass *storage.StorageClass) (string, error) {
@@ -1513,7 +1514,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(
klog.V(4).Infof("provisionClaimOperation [%s]: plugin name: %s, provisioner name: %s", claimToClaimKey(claim), pluginName, provisionerName)
// Add provisioner annotation to be consistent with external provisioner workflow
newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)
if err != nil {
// Save failed, the controller will retry in the next sync
klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
@@ -1696,6 +1697,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(
// provisionClaimOperationExternal provisions a volume using external provisioner async-ly
// This method will be running in a standalone go-routine scheduled in "provisionClaim"
func (ctrl *PersistentVolumeController) provisionClaimOperationExternal(
ctx context.Context,
claim *v1.PersistentVolumeClaim,
storageClass *storage.StorageClass) (string, error) {
claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)
@@ -1714,7 +1716,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperationExternal(
}
}
// Add provisioner annotation so external provisioners know when to start
newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)
if err != nil {
// Save failed, the controller will retry in the next sync
klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)

View File

@@ -204,7 +204,7 @@ func (ctrl *PersistentVolumeController) storeClaimUpdate(claim interface{}) (boo
// updateVolume runs in worker thread and handles "volume added",
// "volume updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {
func (ctrl *PersistentVolumeController) updateVolume(ctx context.Context, volume *v1.PersistentVolume) {
// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := ctrl.storeVolumeUpdate(volume)
@@ -215,7 +215,7 @@ func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume
return
}
err = ctrl.syncVolume(volume)
err = ctrl.syncVolume(ctx, volume)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
@@ -252,7 +252,7 @@ func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume
// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
func (ctrl *PersistentVolumeController) updateClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeClaimUpdate(claim)
@@ -262,7 +262,7 @@ func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeCl
if !new {
return
}
err = ctrl.syncClaim(claim)
err = ctrl.syncClaim(ctx, claim)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
@@ -300,7 +300,7 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl
}
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
func (ctrl *PersistentVolumeController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()
@@ -308,22 +308,22 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller")
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
if !cache.WaitForNamedCacheSync("persistent volume", ctx.Done(), ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
return
}
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, ctx.Done())
go wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second)
go wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second)
metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)
<-stopCh
<-ctx.Done()
}
func (ctrl *PersistentVolumeController) updateClaimMigrationAnnotations(claim *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
func (ctrl *PersistentVolumeController) updateClaimMigrationAnnotations(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
// TODO: update[Claim|Volume]MigrationAnnotations can be optimized to not
// copy the claim/volume if no modifications are required. Though this
// requires some refactoring as well as an interesting change in the
@@ -335,7 +335,7 @@ func (ctrl *PersistentVolumeController) updateClaimMigrationAnnotations(claim *v
if !modified {
return claimClone, nil
}
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(ctx, claimClone, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("persistent Volume Controller can't anneal migration annotations: %v", err)
}
@@ -346,13 +346,13 @@ func (ctrl *PersistentVolumeController) updateClaimMigrationAnnotations(claim *v
return newClaim, nil
}
func (ctrl *PersistentVolumeController) updateVolumeMigrationAnnotations(volume *v1.PersistentVolume) (*v1.PersistentVolume, error) {
func (ctrl *PersistentVolumeController) updateVolumeMigrationAnnotations(ctx context.Context, volume *v1.PersistentVolume) (*v1.PersistentVolume, error) {
volumeClone := volume.DeepCopy()
modified := updateMigrationAnnotations(ctrl.csiMigratedPluginManager, ctrl.translator, volumeClone.Annotations, false)
if !modified {
return volumeClone, nil
}
newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), volumeClone, metav1.UpdateOptions{})
newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(ctx, volumeClone, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("persistent Volume Controller can't anneal migration annotations: %v", err)
}
@@ -424,8 +424,8 @@ func updateMigrationAnnotations(cmpm CSIMigratedPluginManager, translator CSINam
// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {
workFunc := func() bool {
func (ctrl *PersistentVolumeController) volumeWorker(ctx context.Context) {
workFunc := func(ctx context.Context) bool {
keyObj, quit := ctrl.volumeQueue.Get()
if quit {
return true
@@ -443,7 +443,7 @@ func (ctrl *PersistentVolumeController) volumeWorker() {
if err == nil {
// The volume still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateVolume(volume)
ctrl.updateVolume(ctx, volume)
return false
}
if !errors.IsNotFound(err) {
@@ -473,7 +473,7 @@ func (ctrl *PersistentVolumeController) volumeWorker() {
return false
}
for {
if quit := workFunc(); quit {
if quit := workFunc(ctx); quit {
klog.Infof("volume worker queue shutting down")
return
}
@@ -482,7 +482,7 @@ func (ctrl *PersistentVolumeController) volumeWorker() {
// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {
func (ctrl *PersistentVolumeController) claimWorker(ctx context.Context) {
workFunc := func() bool {
keyObj, quit := ctrl.claimQueue.Get()
if quit {
@@ -501,7 +501,7 @@ func (ctrl *PersistentVolumeController) claimWorker() {
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
ctrl.updateClaim(ctx, claim)
return false
}
if !errors.IsNotFound(err) {
@@ -564,7 +564,7 @@ func (ctrl *PersistentVolumeController) resync() {
// setClaimProvisioner saves
// claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner
func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) {
func (ctrl *PersistentVolumeController) setClaimProvisioner(ctx context.Context, claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) {
if val, ok := claim.Annotations[pvutil.AnnStorageProvisioner]; ok && val == provisionerName {
// annotation is already set, nothing to do
return claim, nil
@@ -577,7 +577,7 @@ func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.Persistent
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBetaStorageProvisioner, provisionerName)
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnStorageProvisioner, provisionerName)
updateMigrationAnnotations(ctrl.csiMigratedPluginManager, ctrl.translator, claimClone.Annotations, true)
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(ctx, claimClone, metav1.UpdateOptions{})
if err != nil {
return newClaim, err
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package persistentvolume
import (
"context"
"errors"
"reflect"
"testing"
@@ -341,10 +342,10 @@ func TestControllerSync(t *testing.T) {
}
// Start the controller
stopCh := make(chan struct{})
informers.Start(stopCh)
informers.WaitForCacheSync(stopCh)
go ctrl.Run(stopCh)
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done())
go ctrl.Run(ctx)
// Wait for the controller to pass initial sync and fill its caches.
err = wait.Poll(10*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
@@ -369,7 +370,7 @@ func TestControllerSync(t *testing.T) {
if err != nil {
t.Errorf("Failed to run test %s: %v", test.name, err)
}
close(stopCh)
cancel()
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
}

View File

@@ -100,31 +100,31 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI
}
// Run runs the controller goroutines.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.InfoS("Starting PVC protection controller")
defer klog.InfoS("Shutting down PVC protection controller")
if !cache.WaitForNamedCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) {
if !cache.WaitForNamedCacheSync("PVC protection", ctx.Done(), c.pvcListerSynced, c.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
pvcKey, quit := c.queue.Get()
if quit {
return false
@@ -137,7 +137,7 @@ func (c *Controller) processNextWorkItem() bool {
return true
}
err = c.processPVC(pvcNamespace, pvcName)
err = c.processPVC(ctx, pvcNamespace, pvcName)
if err == nil {
c.queue.Forget(pvcKey)
return true
@@ -149,7 +149,7 @@ func (c *Controller) processNextWorkItem() bool {
return true
}
func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error {
klog.V(4).InfoS("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
startTime := time.Now()
defer func() {
@@ -168,12 +168,12 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
// PVC should be deleted. Check if it's used and remove finalizer if
// it's not.
isUsed, err := c.isBeingUsed(pvc)
isUsed, err := c.isBeingUsed(ctx, pvc)
if err != nil {
return err
}
if !isUsed {
return c.removeFinalizer(pvc)
return c.removeFinalizer(ctx, pvc)
}
klog.V(2).InfoS("Keeping PVC because it is being used", "PVC", klog.KObj(pvc))
}
@@ -183,19 +183,19 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
// finalizer should be added by admission plugin, this is just to add
// the finalizer to old PVCs that were created before the admission
// plugin was enabled.
return c.addFinalizer(pvc)
return c.addFinalizer(ctx, pvc)
}
return nil
}
func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
func (c *Controller) addFinalizer(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
// Skip adding Finalizer in case the StorageObjectInUseProtection feature is not enabled
if !c.storageObjectInUseProtectionEnabled {
return nil
}
claimClone := pvc.DeepCopy()
claimClone.ObjectMeta.Finalizers = append(claimClone.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer)
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(ctx, claimClone, metav1.UpdateOptions{})
if err != nil {
klog.ErrorS(err, "Error adding protection finalizer to PVC", "PVC", klog.KObj(pvc))
return err
@@ -204,10 +204,10 @@ func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
return nil
}
func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
claimClone := pvc.DeepCopy()
claimClone.ObjectMeta.Finalizers = slice.RemoveString(claimClone.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer, nil)
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(ctx, claimClone, metav1.UpdateOptions{})
if err != nil {
klog.ErrorS(err, "Error removing protection finalizer from PVC", "PVC", klog.KObj(pvc))
return err
@@ -216,7 +216,7 @@ func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
return nil
}
func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
// Look for a Pod using pvc in the Informer's cache. If one is found the
// correct decision to keep pvc is taken without doing an expensive live
// list.
@@ -231,7 +231,7 @@ func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
// be 100% confident that it is safe to delete pvc make sure no Pod is using
// it among those returned by a live list.
return c.askAPIServer(pvc)
return c.askAPIServer(ctx, pvc)
}
func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
@@ -260,10 +260,10 @@ func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
return false, nil
}
func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) {
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
klog.V(4).InfoS("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc))
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(context.TODO(), metav1.ListOptions{})
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package pvcprotection
import (
"context"
"errors"
"reflect"
"testing"
@@ -476,7 +477,7 @@ func TestPVCProtectionController(t *testing.T) {
}
if ctrl.queue.Len() > 0 {
klog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len())
ctrl.processNextWorkItem()
ctrl.processNextWorkItem(context.TODO())
}
if ctrl.queue.Len() > 0 {
// There is still some work in the queue, process it now

View File

@@ -76,31 +76,31 @@ func NewPVProtectionController(pvInformer coreinformers.PersistentVolumeInformer
}
// Run runs the controller goroutines.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting PV protection controller")
defer klog.Infof("Shutting down PV protection controller")
if !cache.WaitForNamedCacheSync("PV protection", stopCh, c.pvListerSynced) {
if !cache.WaitForNamedCacheSync("PV protection", ctx.Done(), c.pvListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
pvKey, quit := c.queue.Get()
if quit {
return false
@@ -109,7 +109,7 @@ func (c *Controller) processNextWorkItem() bool {
pvName := pvKey.(string)
err := c.processPV(pvName)
err := c.processPV(ctx, pvName)
if err == nil {
c.queue.Forget(pvKey)
return true
@@ -121,7 +121,7 @@ func (c *Controller) processNextWorkItem() bool {
return true
}
func (c *Controller) processPV(pvName string) error {
func (c *Controller) processPV(ctx context.Context, pvName string) error {
klog.V(4).Infof("Processing PV %s", pvName)
startTime := time.Now()
defer func() {
@@ -142,7 +142,7 @@ func (c *Controller) processPV(pvName string) error {
// it's not.
isUsed := c.isBeingUsed(pv)
if !isUsed {
return c.removeFinalizer(pv)
return c.removeFinalizer(ctx, pv)
}
klog.V(4).Infof("Keeping PV %s because it is being used", pvName)
}
@@ -152,19 +152,19 @@ func (c *Controller) processPV(pvName string) error {
// finalizer should be added by admission plugin, this is just to add
// the finalizer to old PVs that were created before the admission
// plugin was enabled.
return c.addFinalizer(pv)
return c.addFinalizer(ctx, pv)
}
return nil
}
func (c *Controller) addFinalizer(pv *v1.PersistentVolume) error {
func (c *Controller) addFinalizer(ctx context.Context, pv *v1.PersistentVolume) error {
// Skip adding Finalizer in case the StorageObjectInUseProtection feature is not enabled
if !c.storageObjectInUseProtectionEnabled {
return nil
}
pvClone := pv.DeepCopy()
pvClone.ObjectMeta.Finalizers = append(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer)
_, err := c.client.CoreV1().PersistentVolumes().Update(context.TODO(), pvClone, metav1.UpdateOptions{})
_, err := c.client.CoreV1().PersistentVolumes().Update(ctx, pvClone, metav1.UpdateOptions{})
if err != nil {
klog.V(3).Infof("Error adding protection finalizer to PV %s: %v", pv.Name, err)
return err
@@ -173,10 +173,10 @@ func (c *Controller) addFinalizer(pv *v1.PersistentVolume) error {
return nil
}
func (c *Controller) removeFinalizer(pv *v1.PersistentVolume) error {
func (c *Controller) removeFinalizer(ctx context.Context, pv *v1.PersistentVolume) error {
pvClone := pv.DeepCopy()
pvClone.ObjectMeta.Finalizers = slice.RemoveString(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
_, err := c.client.CoreV1().PersistentVolumes().Update(context.TODO(), pvClone, metav1.UpdateOptions{})
_, err := c.client.CoreV1().PersistentVolumes().Update(ctx, pvClone, metav1.UpdateOptions{})
if err != nil {
klog.V(3).Infof("Error removing protection finalizer from PV %s: %v", pv.Name, err)
return err

View File

@@ -17,6 +17,7 @@ limitations under the License.
package pvprotection
import (
"context"
"errors"
"reflect"
"testing"
@@ -247,7 +248,7 @@ func TestPVProtectionController(t *testing.T) {
}
if ctrl.queue.Len() > 0 {
klog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len())
ctrl.processNextWorkItem()
ctrl.processNextWorkItem(context.TODO())
}
if ctrl.queue.Len() > 0 {
// There is still some work in the queue, process it now