Merge pull request #110958 from kidddddddddddddddddddddd/cleanup/remove-potential-goroutine-leak-in-binder

Pass context to pkg/scheduler/framework/plugins/volumebinding.
This commit is contained in:
Kubernetes Prow Robot 2022-07-13 22:50:56 -07:00 committed by GitHub
commit b70f340209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 11 deletions

View File

@ -185,7 +185,7 @@ type SchedulerVolumeBinder interface {
// 3. Wait for PVCs to be completely bound by the PV controller // 3. Wait for PVCs to be completely bound by the PV controller
// //
// This function can be called in parallel. // This function can be called in parallel.
BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) error BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) error
} }
type volumeBinder struct { type volumeBinder struct {
@ -432,7 +432,7 @@ func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
// BindPodVolumes gets the cached bindings and PVCs to provision in pod's volumes information, // BindPodVolumes gets the cached bindings and PVCs to provision in pod's volumes information,
// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound // makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound
// by the PV controller. // by the PV controller.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) { func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
klog.V(4).InfoS("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName)) klog.V(4).InfoS("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName))
defer func() { defer func() {
@ -445,7 +445,7 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes
claimsToProvision := podVolumes.DynamicProvisions claimsToProvision := podVolumes.DynamicProvisions
// Start API operations // Start API operations
err = b.bindAPIUpdate(assumedPod, bindings, claimsToProvision) err = b.bindAPIUpdate(ctx, assumedPod, bindings, claimsToProvision)
if err != nil { if err != nil {
return err return err
} }
@ -469,7 +469,7 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string {
} }
// bindAPIUpdate makes the API update for those PVs/PVCs. // bindAPIUpdate makes the API update for those PVs/PVCs.
func (b *volumeBinder) bindAPIUpdate(pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error { func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
podName := getPodName(pod) podName := getPodName(pod)
if bindings == nil { if bindings == nil {
return fmt.Errorf("failed to get cached bindings for pod %q", podName) return fmt.Errorf("failed to get cached bindings for pod %q", podName)
@ -503,7 +503,7 @@ func (b *volumeBinder) bindAPIUpdate(pod *v1.Pod, bindings []*BindingInfo, claim
klog.V(5).InfoS("bindAPIUpdate: binding PV to PVC", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc)) klog.V(5).InfoS("bindAPIUpdate: binding PV to PVC", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
// TODO: does it hurt if we make an api call and nothing needs to be updated? // TODO: does it hurt if we make an api call and nothing needs to be updated?
klog.V(2).InfoS("Claim bound to volume", "PVC", klog.KObj(binding.pvc), "PV", klog.KObj(binding.pv)) klog.V(2).InfoS("Claim bound to volume", "PVC", klog.KObj(binding.pvc), "PV", klog.KObj(binding.pv))
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), binding.pv, metav1.UpdateOptions{}) newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(ctx, binding.pv, metav1.UpdateOptions{})
if err != nil { if err != nil {
klog.V(4).InfoS("Updating PersistentVolume: binding to claim failed", "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err) klog.V(4).InfoS("Updating PersistentVolume: binding to claim failed", "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err)
return err return err
@ -518,7 +518,7 @@ func (b *volumeBinder) bindAPIUpdate(pod *v1.Pod, bindings []*BindingInfo, claim
// PV controller is expected to signal back by removing related annotations if actual provisioning fails // PV controller is expected to signal back by removing related annotations if actual provisioning fails
for i, claim = range claimsToProvision { for i, claim = range claimsToProvision {
klog.V(5).InfoS("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim)) klog.V(5).InfoS("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim))
newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{}) newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil { if err != nil {
return err return err
} }

View File

@ -1529,7 +1529,7 @@ func TestBindAPIUpdate(t *testing.T) {
testEnv.assumeVolumes(t, "node1", pod, scenario.bindings, scenario.provisionedPVCs) testEnv.assumeVolumes(t, "node1", pod, scenario.bindings, scenario.provisionedPVCs)
// Execute // Execute
err := testEnv.internalBinder.bindAPIUpdate(pod, scenario.bindings, scenario.provisionedPVCs) err := testEnv.internalBinder.bindAPIUpdate(ctx, pod, scenario.bindings, scenario.provisionedPVCs)
// Validate // Validate
if !scenario.shouldFail && err != nil { if !scenario.shouldFail && err != nil {
@ -2085,7 +2085,7 @@ func TestBindPodVolumes(t *testing.T) {
StaticBindings: bindings, StaticBindings: bindings,
DynamicProvisions: claimsToProvision, DynamicProvisions: claimsToProvision,
} }
err := testEnv.binder.BindPodVolumes(pod, podVolumes) err := testEnv.binder.BindPodVolumes(ctx, pod, podVolumes)
// Validate // Validate
if !scenario.shouldFail && err != nil { if !scenario.shouldFail && err != nil {

View File

@ -16,7 +16,11 @@ limitations under the License.
package volumebinding package volumebinding
import v1 "k8s.io/api/core/v1" import (
"context"
v1 "k8s.io/api/core/v1"
)
// FakeVolumeBinderConfig holds configurations for fake volume binder. // FakeVolumeBinderConfig holds configurations for fake volume binder.
type FakeVolumeBinderConfig struct { type FakeVolumeBinderConfig struct {
@ -62,7 +66,7 @@ func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string,
func (b *FakeVolumeBinder) RevertAssumedPodVolumes(_ *PodVolumes) {} func (b *FakeVolumeBinder) RevertAssumedPodVolumes(_ *PodVolumes) {}
// BindPodVolumes implements SchedulerVolumeBinder.BindPodVolumes. // BindPodVolumes implements SchedulerVolumeBinder.BindPodVolumes.
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) error { func (b *FakeVolumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) error {
b.BindCalled = true b.BindCalled = true
return b.config.BindErr return b.config.BindErr
} }

View File

@ -333,7 +333,7 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState,
return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName)) return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
} }
klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod)) klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod))
err = pl.Binder.BindPodVolumes(pod, podVolumes) err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
if err != nil { if err != nil {
klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err) klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
return framework.AsStatus(err) return framework.AsStatus(err)