diff --git a/cmd/kube-scheduler/app/server_test.go b/cmd/kube-scheduler/app/server_test.go index dd670d75793..cc82fc7f510 100644 --- a/cmd/kube-scheduler/app/server_test.go +++ b/cmd/kube-scheduler/app/server_test.go @@ -156,6 +156,7 @@ profiles: {Name: "NodePorts"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, + {Name: "VolumeBinding"}, }, "FilterPlugin": { {Name: "NodeUnschedulable"}, @@ -287,6 +288,7 @@ profiles: {Name: "NodePorts"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, + {Name: "VolumeBinding"}, }, "FilterPlugin": { {Name: "NodeUnschedulable"}, diff --git a/pkg/controller/volume/scheduling/scheduler_binder.go b/pkg/controller/volume/scheduling/scheduler_binder.go index ca91364e38e..0c9c9516148 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder.go +++ b/pkg/controller/volume/scheduling/scheduler_binder.go @@ -78,17 +78,19 @@ type InTreeToCSITranslator interface { // // This integrates into the existing scheduler workflow as follows: // 1. The scheduler takes a Pod off the scheduler queue and processes it serially: -// a. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here. -// b. Invokes all score plugins. Future/TBD -// c. Selects the best node for the Pod. -// d. Invokes all reserve plugins. AssumePodVolumes() is invoked here. +// a. Invokes all pre-filter plugins for the pod. GetPodVolumes() is invoked +// here, pod volume information will be saved in current scheduling cycle state for later use. +// b. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here. +// c. Invokes all score plugins. Future/TBD +// d. Selects the best node for the Pod. +// e. Invokes all reserve plugins. AssumePodVolumes() is invoked here. // i. If PVC binding is required, cache in-memory only: // * For manual binding: update PV objects for prebinding to the corresponding PVCs. // * For dynamic provisioning: update PVC object with a selected node from c) // * For the pod, which PVCs and PVs need API updates. // ii. Afterwards, the main scheduler caches the Pod->Node binding in the scheduler's pod cache, // This is handled in the scheduler and not here. -// e. Asynchronously bind volumes and pod in a separate goroutine +// f. Asynchronously bind volumes and pod in a separate goroutine // i. BindPodVolumes() is called first in PreBind phase. It makes all the necessary API updates and waits for // PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent // back through the scheduler. @@ -96,6 +98,10 @@ type InTreeToCSITranslator interface { // 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue // while the actual binding operation occurs in the background. type SchedulerVolumeBinder interface { + // GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) + // and unbound with immediate binding (including prebound) + GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) + // FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node. // // If a PVC is bound, it checks if the PV's NodeAffinity matches the Node. @@ -105,7 +111,7 @@ type SchedulerVolumeBinder interface { // (currently) not usable for the pod. // // This function is called by the volume binding scheduler predicate and can be called in parallel - FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) + FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) // AssumePodVolumes will: // 1. Take the PV matches for unbound PVCs and update the PV cache assuming @@ -194,7 +200,7 @@ func (b *volumeBinder) DeletePodBindings(pod *v1.Pod) { // FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache. // This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer. // That's necessary because some operations will need to pass in to the predicate fake node objects. -func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) { +func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) { podName := getPodName(pod) // Warning: Below log needs high verbosity as it can be printed several times (#60933). @@ -248,18 +254,6 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons Confl b.podBindingCache.UpdateBindings(pod, node.Name, matchedBindings, provisionedClaims) }() - // The pod's volumes need to be processed in one call to avoid the race condition where - // volumes can get bound/provisioned in between calls. - boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod) - if err != nil { - return nil, err - } - - // Immediate claims should be bound - if len(unboundClaimsImmediate) > 0 { - return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") - } - // Check PV node affinity on bound volumes if len(boundClaims) > 0 { boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName) @@ -684,9 +678,9 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { return true } -// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) +// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) // and unbound with immediate binding (including prebound) -func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { boundClaims = []*v1.PersistentVolumeClaim{} unboundClaimsImmediate = []*v1.PersistentVolumeClaim{} unboundClaimsDelayBinding = []*v1.PersistentVolumeClaim{} diff --git a/pkg/controller/volume/scheduling/scheduler_binder_fake.go b/pkg/controller/volume/scheduling/scheduler_binder_fake.go index ddf1cedf31e..5ebe0a93502 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder_fake.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_fake.go @@ -42,8 +42,13 @@ type FakeVolumeBinder struct { BindCalled bool } +// GetPodVolumes implements SchedulerVolumeBinder.GetPodVolumes. +func (b *FakeVolumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { + return nil, nil, nil, nil +} + // FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes. -func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) { +func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) { return b.config.FindReasons, b.config.FindErr } diff --git a/pkg/controller/volume/scheduling/scheduler_binder_test.go b/pkg/controller/volume/scheduling/scheduler_binder_test.go index 75a84958a68..fdfd8abaa03 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder_test.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_test.go @@ -771,6 +771,18 @@ func checkReasons(t *testing.T, actual, expected ConflictReasons) { } } +// findPodVolumes gets and finds volumes for given pod and node +func findPodVolumes(binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (ConflictReasons, error) { + boundClaims, claimsToBind, unboundClaimsImmediate, err := binder.GetPodVolumes(pod) + if err != nil { + return nil, err + } + if len(unboundClaimsImmediate) > 0 { + return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") + } + return binder.FindPodVolumes(pod, boundClaims, claimsToBind, node) +} + func TestFindPodVolumesWithoutProvisioning(t *testing.T) { type scenarioType struct { // Inputs @@ -907,7 +919,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { } // Execute - reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode) + reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode) // Validate if !scenario.shouldFail && err != nil { @@ -1012,7 +1024,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { } // Execute - reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode) + reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode) // Validate if !scenario.shouldFail && err != nil { @@ -1112,7 +1124,7 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) { } // Execute - reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, node) + reasons, err := findPodVolumes(testEnv.binder, scenario.pod, node) // Validate if !scenario.shouldFail && err != nil { @@ -1933,7 +1945,7 @@ func TestFindAssumeVolumes(t *testing.T) { // Execute // 1. Find matching PVs - reasons, err := testEnv.binder.FindPodVolumes(pod, testNode) + reasons, err := findPodVolumes(testEnv.binder, pod, testNode) if err != nil { t.Errorf("Test failed: FindPodVolumes returned error: %v", err) } @@ -1959,7 +1971,7 @@ func TestFindAssumeVolumes(t *testing.T) { // This should always return the original chosen pv // Run this many times in case sorting returns different orders for the two PVs. for i := 0; i < 50; i++ { - reasons, err := testEnv.binder.FindPodVolumes(pod, testNode) + reasons, err := findPodVolumes(testEnv.binder, pod, testNode) if err != nil { t.Errorf("Test failed: FindPodVolumes returned error: %v", err) } diff --git a/pkg/scheduler/algorithmprovider/registry.go b/pkg/scheduler/algorithmprovider/registry.go index 76cc60b47f9..a7f831c3e3f 100644 --- a/pkg/scheduler/algorithmprovider/registry.go +++ b/pkg/scheduler/algorithmprovider/registry.go @@ -87,6 +87,7 @@ func getDefaultConfig() *schedulerapi.Plugins { {Name: nodeports.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, + {Name: volumebinding.Name}, }, }, Filter: &schedulerapi.PluginSet{ diff --git a/pkg/scheduler/algorithmprovider/registry_test.go b/pkg/scheduler/algorithmprovider/registry_test.go index 4e0ff2ca8ed..7d9a1ad69e5 100644 --- a/pkg/scheduler/algorithmprovider/registry_test.go +++ b/pkg/scheduler/algorithmprovider/registry_test.go @@ -58,6 +58,7 @@ func TestClusterAutoscalerProvider(t *testing.T) { {Name: nodeports.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, + {Name: volumebinding.Name}, }, }, Filter: &schedulerapi.PluginSet{ @@ -154,6 +155,7 @@ func TestApplyFeatureGates(t *testing.T) { {Name: nodeports.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, + {Name: volumebinding.Name}, }, }, Filter: &schedulerapi.PluginSet{ @@ -238,6 +240,7 @@ func TestApplyFeatureGates(t *testing.T) { {Name: nodeports.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, + {Name: volumebinding.Name}, }, }, Filter: &schedulerapi.PluginSet{ diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 1caab74aec2..559562c2718 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -666,6 +666,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, {Name: "ServiceAffinity"}, + {Name: "VolumeBinding"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -773,6 +774,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, {Name: "ServiceAffinity"}, + {Name: "VolumeBinding"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -892,6 +894,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, {Name: "ServiceAffinity"}, + {Name: "VolumeBinding"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -1013,6 +1016,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, {Name: "ServiceAffinity"}, + {Name: "VolumeBinding"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -1134,6 +1138,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, {Name: "ServiceAffinity"}, + {Name: "VolumeBinding"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -1260,6 +1265,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, {Name: "ServiceAffinity"}, + {Name: "VolumeBinding"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -1389,6 +1395,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { {Name: "NodePorts"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, + {Name: "VolumeBinding"}, }, "FilterPlugin": { {Name: "NodeUnschedulable"}, @@ -1457,6 +1464,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { {Name: "NodePorts"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, + {Name: "VolumeBinding"}, }, "FilterPlugin": { {Name: "NodeUnschedulable"}, @@ -1545,6 +1553,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { {Name: "NodePorts"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, + {Name: "VolumeBinding"}, }, "FilterPlugin": { {Name: "NodeUnschedulable"}, @@ -1740,6 +1749,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { {Name: "NodePorts"}, {Name: "InterPodAffinity"}, {Name: "PodTopologySpread"}, + {Name: "VolumeBinding"}, }, }, Filter: &config.PluginSet{ diff --git a/pkg/scheduler/framework/plugins/legacy_registry.go b/pkg/scheduler/framework/plugins/legacy_registry.go index 644908f8f95..4cca85d99ea 100644 --- a/pkg/scheduler/framework/plugins/legacy_registry.go +++ b/pkg/scheduler/framework/plugins/legacy_registry.go @@ -269,6 +269,7 @@ func NewLegacyRegistry() *LegacyRegistry { }) registry.registerPredicateConfigProducer(CheckVolumeBindingPred, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.PreFilter = appendToPluginSet(plugins.PreFilter, volumebinding.Name, nil) plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil) plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil) plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil) diff --git a/pkg/scheduler/framework/plugins/volumebinding/BUILD b/pkg/scheduler/framework/plugins/volumebinding/BUILD index fc37262aca4..7119da3810a 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/BUILD +++ b/pkg/scheduler/framework/plugins/volumebinding/BUILD @@ -36,8 +36,15 @@ go_test( srcs = ["volume_binding_test.go"], embed = [":go_default_library"], deps = [ + "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/controller/volume/scheduling:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 5c9ad28cec3..c8a8f5e1656 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -18,6 +18,7 @@ package volumebinding import ( "context" + "errors" "fmt" "time" @@ -35,14 +36,17 @@ const ( // DefaultBindTimeoutSeconds defines the default bind timeout in seconds DefaultBindTimeoutSeconds = 600 - allBoundStateKey framework.StateKey = "volumebinding:all-bound" + stateKey framework.StateKey = Name ) type stateData struct { - allBound bool + skip bool // set true if pod does not have PVCs + boundClaims []*v1.PersistentVolumeClaim + claimsToBind []*v1.PersistentVolumeClaim + allBound bool } -func (d stateData) Clone() framework.StateData { +func (d *stateData) Clone() framework.StateData { return d } @@ -58,6 +62,7 @@ type VolumeBinding struct { Binder scheduling.SchedulerVolumeBinder } +var _ framework.PreFilterPlugin = &VolumeBinding{} var _ framework.FilterPlugin = &VolumeBinding{} var _ framework.ReservePlugin = &VolumeBinding{} var _ framework.PreBindPlugin = &VolumeBinding{} @@ -81,6 +86,48 @@ func podHasPVCs(pod *v1.Pod) bool { return false } +// PreFilter invoked at the prefilter extension point to check if pod has all +// immediate PVCs bound. If not all immediate PVCs are bound, an +// UnschedulableAndUnresolvable is returned. +func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { + // If pod does not request any PVC, we don't need to do anything. + if !podHasPVCs(pod) { + state.Write(stateKey, &stateData{skip: true}) + return nil + } + boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + if len(unboundClaimsImmediate) > 0 { + // Return UnschedulableAndUnresolvable error if immediate claims are + // not bound. Pod will be moved to active/backoff queues once these + // claims are bound by PV controller. + status := framework.NewStatus(framework.UnschedulableAndUnresolvable) + status.AppendReason("pod has unbound immediate PersistentVolumeClaims") + return status + } + state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind}) + return nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (pl *VolumeBinding) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func getStateData(cs *framework.CycleState) (*stateData, error) { + state, err := cs.Read(stateKey) + if err != nil { + return nil, err + } + s, ok := state.(*stateData) + if !ok { + return nil, errors.New("unable to convert state into stateData") + } + return s, nil +} + // Filter invoked at the filter extension point. // It evaluates if a pod can fit due to the volumes it requests, // for both bound and unbound PVCs. @@ -98,12 +145,17 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p if node == nil { return framework.NewStatus(framework.Error, "node not found") } - // If pod does not request any PVC, we don't need to do anything. - if !podHasPVCs(pod) { + + state, err := getStateData(cs) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + + if state.skip { return nil } - reasons, err := pl.Binder.FindPodVolumes(pod, node) + reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node) if err != nil { return framework.NewStatus(framework.Error, err.Error()) @@ -125,7 +177,7 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, if err != nil { return framework.NewStatus(framework.Error, err.Error()) } - cs.Write(allBoundStateKey, stateData{allBound: allBound}) + cs.Write(stateKey, &stateData{allBound: allBound}) return nil } @@ -135,14 +187,10 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, // If binding errors, times out or gets undone, then an error will be returned to // retry scheduling. func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - state, err := cs.Read(allBoundStateKey) + s, err := getStateData(cs) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } - s, ok := state.(stateData) - if !ok { - return framework.NewStatus(framework.Error, "unable to convert state into stateData") - } if s.allBound { // no need to bind volumes return nil diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index 5756b1e00fc..f2df794c023 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -18,97 +18,287 @@ package volumebinding import ( "context" - "fmt" "reflect" "testing" v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" "k8s.io/kubernetes/pkg/controller/volume/scheduling" + "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "k8s.io/utils/pointer" ) -func TestVolumeBinding(t *testing.T) { - findErr := fmt.Errorf("find err") - volState := v1.PodSpec{ - Volumes: []v1.Volume{ - { - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{}, - }, - }, +var ( + immediate = storagev1.VolumeBindingImmediate + waitForFirstConsumer = storagev1.VolumeBindingWaitForFirstConsumer + immediateSC = &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "immediate-sc", + }, + VolumeBindingMode: &immediate, + } + waitSC = &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wait-sc", + }, + VolumeBindingMode: &waitForFirstConsumer, + } +) + +func makePV(name string) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, }, } +} + +func addPVNodeAffinity(pv *v1.PersistentVolume, volumeNodeAffinity *v1.VolumeNodeAffinity) *v1.PersistentVolume { + pv.Spec.NodeAffinity = volumeNodeAffinity + return pv +} + +func makePVC(name string, boundPVName string, storageClassName string) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: v1.NamespaceDefault, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: pointer.StringPtr(storageClassName), + }, + } + if boundPVName != "" { + pvc.Spec.VolumeName = boundPVName + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "true") + } + return pvc +} + +func makePod(name string, pvcNames []string) *v1.Pod { + p := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: v1.NamespaceDefault, + }, + } + p.Spec.Volumes = make([]v1.Volume, 0) + for _, pvcName := range pvcNames { + p.Spec.Volumes = append(p.Spec.Volumes, v1.Volume{ + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + }, + }, + }) + } + return p +} + +func TestVolumeBinding(t *testing.T) { table := []struct { - name string - pod *v1.Pod - node *v1.Node - volumeBinderConfig *scheduling.FakeVolumeBinderConfig - wantStatus *framework.Status + name string + pod *v1.Pod + node *v1.Node + pvcs []*v1.PersistentVolumeClaim + pvs []*v1.PersistentVolume + wantPreFilterStatus *framework.Status + wantStateAfterPreFilter *stateData + wantFilterStatus *framework.Status }{ { - name: "nothing", - pod: &v1.Pod{}, - node: &v1.Node{}, - wantStatus: nil, + name: "pod has not pvcs", + pod: makePod("pod-a", nil), + node: &v1.Node{}, + wantStateAfterPreFilter: &stateData{ + skip: true, + }, }, { name: "all bound", - pod: &v1.Pod{Spec: volState}, + pod: makePod("pod-a", []string{"pvc-a"}), node: &v1.Node{}, - volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{ - AllBound: true, + pvcs: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "pv-a", waitSC.Name), + }, + pvs: []*v1.PersistentVolume{ + makePV("pv-a"), + }, + wantStateAfterPreFilter: &stateData{ + boundClaims: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "pv-a", waitSC.Name), + }, + claimsToBind: []*v1.PersistentVolumeClaim{}, }, - wantStatus: nil, }, { - name: "unbound/no matches", - pod: &v1.Pod{Spec: volState}, + name: "immediate claims not bound", + pod: makePod("pod-a", []string{"pvc-a"}), node: &v1.Node{}, - volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{ - FindReasons: []scheduling.ConflictReason{scheduling.ErrReasonBindConflict}, + pvcs: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "", immediateSC.Name), }, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)), + wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod has unbound immediate PersistentVolumeClaims"), + }, + { + name: "unbound claims no matches", + pod: makePod("pod-a", []string{"pvc-a"}), + node: &v1.Node{}, + pvcs: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "", waitSC.Name), + }, + wantStateAfterPreFilter: &stateData{ + boundClaims: []*v1.PersistentVolumeClaim{}, + claimsToBind: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "", waitSC.Name), + }, + }, + wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)), }, { name: "bound and unbound unsatisfied", - pod: &v1.Pod{Spec: volState}, - node: &v1.Node{}, - volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{ - FindReasons: []scheduling.ConflictReason{scheduling.ErrReasonBindConflict, scheduling.ErrReasonNodeConflict}, + pod: makePod("pod-a", []string{"pvc-a", "pvc-b"}), + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "barbar", + }, + }, }, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict), string(scheduling.ErrReasonNodeConflict)), + pvcs: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "pv-a", waitSC.Name), + makePVC("pvc-b", "", waitSC.Name), + }, + pvs: []*v1.PersistentVolume{ + addPVNodeAffinity(makePV("pv-a"), &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "foo", + Operator: v1.NodeSelectorOpIn, + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }), + }, + wantStateAfterPreFilter: &stateData{ + boundClaims: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "pv-a", waitSC.Name), + }, + claimsToBind: []*v1.PersistentVolumeClaim{ + makePVC("pvc-b", "", waitSC.Name), + }, + }, + wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonNodeConflict), string(scheduling.ErrReasonBindConflict)), }, { - name: "unbound/found matches/bind succeeds", - pod: &v1.Pod{Spec: volState}, - node: &v1.Node{}, - volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{}, - wantStatus: nil, + name: "pvc not found", + pod: makePod("pod-a", []string{"pvc-a"}), + node: &v1.Node{}, + wantPreFilterStatus: framework.NewStatus(framework.Error, `error getting PVC "default/pvc-a": could not find v1.PersistentVolumeClaim "default/pvc-a"`), + wantFilterStatus: nil, }, { - name: "predicate error", - pod: &v1.Pod{Spec: volState}, + name: "pv not found", + pod: makePod("pod-a", []string{"pvc-a"}), node: &v1.Node{}, - volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{ - FindErr: findErr, + pvcs: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "pv-a", waitSC.Name), }, - wantStatus: framework.NewStatus(framework.Error, findErr.Error()), + wantPreFilterStatus: nil, + wantStateAfterPreFilter: &stateData{ + boundClaims: []*v1.PersistentVolumeClaim{ + makePVC("pvc-a", "pv-a", waitSC.Name), + }, + claimsToBind: []*v1.PersistentVolumeClaim{}, + }, + wantFilterStatus: framework.NewStatus(framework.Error, `could not find v1.PersistentVolume "pv-a"`), }, } for _, item := range table { t.Run(item.name, func(t *testing.T) { - nodeInfo := framework.NewNodeInfo() - nodeInfo.SetNode(item.node) - fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig) - p := &VolumeBinding{ - Binder: fakeVolumeBinder, + ctx := context.Background() + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + opts := []framework.Option{ + framework.WithClientSet(client), + framework.WithInformerFactory(informerFactory), } - gotStatus := p.Filter(context.Background(), nil, item.pod, nodeInfo) - if !reflect.DeepEqual(gotStatus, item.wantStatus) { - t.Errorf("status does not match: %v, want: %v", gotStatus, item.wantStatus) + fh, err := framework.NewFramework(nil, nil, nil, opts...) + if err != nil { + t.Fatal(err) + } + pl, err := New(&config.VolumeBindingArgs{ + BindTimeoutSeconds: 300, + }, fh) + if err != nil { + t.Fatal(err) } + // Start informer factory after initialization + informerFactory.Start(ctx.Done()) + + // Feed testing data and wait for them to be synced + client.StorageV1().StorageClasses().Create(ctx, immediateSC, metav1.CreateOptions{}) + client.StorageV1().StorageClasses().Create(ctx, waitSC, metav1.CreateOptions{}) + if item.node != nil { + client.CoreV1().Nodes().Create(ctx, item.node, metav1.CreateOptions{}) + } + if len(item.pvcs) > 0 { + for _, pvc := range item.pvcs { + client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + } + } + if len(item.pvs) > 0 { + for _, pv := range item.pvs { + client.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}) + } + } + caches := informerFactory.WaitForCacheSync(ctx.Done()) + for _, synced := range caches { + if !synced { + t.Errorf("error waiting for informer cache sync") + } + } + + // Verify + p := pl.(*VolumeBinding) + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(item.node) + state := framework.NewCycleState() + t.Logf("call PreFilter and check status") + gotPreFilterStatus := p.PreFilter(ctx, state, item.pod) + if !reflect.DeepEqual(gotPreFilterStatus, item.wantPreFilterStatus) { + t.Errorf("filter prefilter status does not match: %v, want: %v", gotPreFilterStatus, item.wantPreFilterStatus) + } + if !gotPreFilterStatus.IsSuccess() { + // scheduler framework will skip Filter if PreFilter fails + return + } + t.Logf("check state after prefilter phase") + stateData, err := getStateData(state) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(stateData, item.wantStateAfterPreFilter) { + t.Errorf("state got after prefilter does not match: %v, want: %v", stateData, item.wantStateAfterPreFilter) + } + t.Logf("call Filter and check status") + gotStatus := p.Filter(ctx, state, item.pod, nodeInfo) + if !reflect.DeepEqual(gotStatus, item.wantFilterStatus) { + t.Errorf("filter status does not match: %v, want: %v", gotStatus, item.wantFilterStatus) + } }) } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 41b1b7c3cb6..4152bf7d313 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -843,7 +843,7 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) { return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil - }, "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"), + }, "PreFilter", "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"), } s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...) informerFactory.Start(stop) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 586dd5c4d10..b2f101c4473 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -106,6 +106,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, + {Name: "VolumeBinding"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, }, @@ -200,6 +201,7 @@ kind: Policy "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, + {Name: "VolumeBinding"}, {Name: "PodTopologySpread"}, {Name: "InterPodAffinity"}, },