Merge pull request #91775 from cofyc/fix91755

VolumeBinding: Skip/fail fast in PreFilter phase and improve error reporting
This commit is contained in:
Kubernetes Prow Robot 2020-06-12 10:47:56 -07:00 committed by GitHub
commit 1385280afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 366 additions and 91 deletions

View File

@ -156,6 +156,7 @@ profiles:
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -287,6 +288,7 @@ profiles:
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},

View File

@ -78,17 +78,19 @@ type InTreeToCSITranslator interface {
//
// This integrates into the existing scheduler workflow as follows:
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
// a. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here.
// b. Invokes all score plugins. Future/TBD
// c. Selects the best node for the Pod.
// d. Invokes all reserve plugins. AssumePodVolumes() is invoked here.
// a. Invokes all pre-filter plugins for the pod. GetPodVolumes() is invoked
// here, pod volume information will be saved in current scheduling cycle state for later use.
// b. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here.
// c. Invokes all score plugins. Future/TBD
// d. Selects the best node for the Pod.
// e. Invokes all reserve plugins. AssumePodVolumes() is invoked here.
// i. If PVC binding is required, cache in-memory only:
// * For manual binding: update PV objects for prebinding to the corresponding PVCs.
// * For dynamic provisioning: update PVC object with a selected node from c)
// * For the pod, which PVCs and PVs need API updates.
// ii. Afterwards, the main scheduler caches the Pod->Node binding in the scheduler's pod cache,
// This is handled in the scheduler and not here.
// e. Asynchronously bind volumes and pod in a separate goroutine
// f. Asynchronously bind volumes and pod in a separate goroutine
// i. BindPodVolumes() is called first in PreBind phase. It makes all the necessary API updates and waits for
// PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent
// back through the scheduler.
@ -96,6 +98,10 @@ type InTreeToCSITranslator interface {
// 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue
// while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// and unbound with immediate binding (including prebound)
GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error)
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
//
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
@ -105,7 +111,7 @@ type SchedulerVolumeBinder interface {
// (currently) not usable for the pod.
//
// This function is called by the volume binding scheduler predicate and can be called in parallel
FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error)
FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error)
// AssumePodVolumes will:
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
@ -194,7 +200,7 @@ func (b *volumeBinder) DeletePodBindings(pod *v1.Pod) {
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
// That's necessary because some operations will need to pass in to the predicate fake node objects.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) {
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) {
podName := getPodName(pod)
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
@ -248,18 +254,6 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons Confl
b.podBindingCache.UpdateBindings(pod, node.Name, matchedBindings, provisionedClaims)
}()
// The pod's volumes need to be processed in one call to avoid the race condition where
// volumes can get bound/provisioned in between calls.
boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
if err != nil {
return nil, err
}
// Immediate claims should be bound
if len(unboundClaimsImmediate) > 0 {
return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}
// Check PV node affinity on bound volumes
if len(boundClaims) > 0 {
boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
@ -684,9 +678,9 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
return true
}
// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// and unbound with immediate binding (including prebound)
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
boundClaims = []*v1.PersistentVolumeClaim{}
unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
unboundClaimsDelayBinding = []*v1.PersistentVolumeClaim{}

View File

@ -42,8 +42,13 @@ type FakeVolumeBinder struct {
BindCalled bool
}
// GetPodVolumes implements SchedulerVolumeBinder.GetPodVolumes.
func (b *FakeVolumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
return nil, nil, nil, nil
}
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) {
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) {
return b.config.FindReasons, b.config.FindErr
}

View File

@ -771,6 +771,18 @@ func checkReasons(t *testing.T, actual, expected ConflictReasons) {
}
}
// findPodVolumes gets and finds volumes for given pod and node
func findPodVolumes(binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (ConflictReasons, error) {
boundClaims, claimsToBind, unboundClaimsImmediate, err := binder.GetPodVolumes(pod)
if err != nil {
return nil, err
}
if len(unboundClaimsImmediate) > 0 {
return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}
return binder.FindPodVolumes(pod, boundClaims, claimsToBind, node)
}
func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
type scenarioType struct {
// Inputs
@ -907,7 +919,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
}
// Execute
reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
@ -1012,7 +1024,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
}
// Execute
reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
@ -1112,7 +1124,7 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) {
}
// Execute
reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, node)
reasons, err := findPodVolumes(testEnv.binder, scenario.pod, node)
// Validate
if !scenario.shouldFail && err != nil {
@ -1933,7 +1945,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// Execute
// 1. Find matching PVs
reasons, err := testEnv.binder.FindPodVolumes(pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
if err != nil {
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
}
@ -1959,7 +1971,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// This should always return the original chosen pv
// Run this many times in case sorting returns different orders for the two PVs.
for i := 0; i < 50; i++ {
reasons, err := testEnv.binder.FindPodVolumes(pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
if err != nil {
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
}

View File

@ -87,6 +87,7 @@ func getDefaultConfig() *schedulerapi.Plugins {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{

View File

@ -58,6 +58,7 @@ func TestClusterAutoscalerProvider(t *testing.T) {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{
@ -154,6 +155,7 @@ func TestApplyFeatureGates(t *testing.T) {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{
@ -238,6 +240,7 @@ func TestApplyFeatureGates(t *testing.T) {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{

View File

@ -666,6 +666,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -773,6 +774,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -892,6 +894,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1013,6 +1016,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1134,6 +1138,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1260,6 +1265,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1389,6 +1395,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -1457,6 +1464,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -1545,6 +1553,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -1740,6 +1749,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
{Name: "PodTopologySpread"},
{Name: "VolumeBinding"},
},
},
Filter: &config.PluginSet{

View File

@ -269,6 +269,7 @@ func NewLegacyRegistry() *LegacyRegistry {
})
registry.registerPredicateConfigProducer(CheckVolumeBindingPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, volumebinding.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil)
plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil)

View File

@ -36,8 +36,15 @@ go_test(
srcs = ["volume_binding_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -18,6 +18,7 @@ package volumebinding
import (
"context"
"errors"
"fmt"
"time"
@ -35,14 +36,17 @@ const (
// DefaultBindTimeoutSeconds defines the default bind timeout in seconds
DefaultBindTimeoutSeconds = 600
allBoundStateKey framework.StateKey = "volumebinding:all-bound"
stateKey framework.StateKey = Name
)
type stateData struct {
allBound bool
skip bool // set true if pod does not have PVCs
boundClaims []*v1.PersistentVolumeClaim
claimsToBind []*v1.PersistentVolumeClaim
allBound bool
}
func (d stateData) Clone() framework.StateData {
func (d *stateData) Clone() framework.StateData {
return d
}
@ -58,6 +62,7 @@ type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder
}
var _ framework.PreFilterPlugin = &VolumeBinding{}
var _ framework.FilterPlugin = &VolumeBinding{}
var _ framework.ReservePlugin = &VolumeBinding{}
var _ framework.PreBindPlugin = &VolumeBinding{}
@ -81,6 +86,48 @@ func podHasPVCs(pod *v1.Pod) bool {
return false
}
// PreFilter invoked at the prefilter extension point to check if pod has all
// immediate PVCs bound. If not all immediate PVCs are bound, an
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// If pod does not request any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
state.Write(stateKey, &stateData{skip: true})
return nil
}
boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if len(unboundClaimsImmediate) > 0 {
// Return UnschedulableAndUnresolvable error if immediate claims are
// not bound. Pod will be moved to active/backoff queues once these
// claims are bound by PV controller.
status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return status
}
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind})
return nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeBinding) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func getStateData(cs *framework.CycleState) (*stateData, error) {
state, err := cs.Read(stateKey)
if err != nil {
return nil, err
}
s, ok := state.(*stateData)
if !ok {
return nil, errors.New("unable to convert state into stateData")
}
return s, nil
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests,
// for both bound and unbound PVCs.
@ -98,12 +145,17 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// If pod does not request any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
state, err := getStateData(cs)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if state.skip {
return nil
}
reasons, err := pl.Binder.FindPodVolumes(pod, node)
reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
@ -125,7 +177,7 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState,
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
cs.Write(allBoundStateKey, stateData{allBound: allBound})
cs.Write(stateKey, &stateData{allBound: allBound})
return nil
}
@ -135,14 +187,10 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState,
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
state, err := cs.Read(allBoundStateKey)
s, err := getStateData(cs)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
s, ok := state.(stateData)
if !ok {
return framework.NewStatus(framework.Error, "unable to convert state into stateData")
}
if s.allBound {
// no need to bind volumes
return nil

View File

@ -18,97 +18,287 @@ package volumebinding
import (
"context"
"fmt"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/utils/pointer"
)
func TestVolumeBinding(t *testing.T) {
findErr := fmt.Errorf("find err")
volState := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{},
},
},
var (
immediate = storagev1.VolumeBindingImmediate
waitForFirstConsumer = storagev1.VolumeBindingWaitForFirstConsumer
immediateSC = &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "immediate-sc",
},
VolumeBindingMode: &immediate,
}
waitSC = &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "wait-sc",
},
VolumeBindingMode: &waitForFirstConsumer,
}
)
func makePV(name string) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
}
func addPVNodeAffinity(pv *v1.PersistentVolume, volumeNodeAffinity *v1.VolumeNodeAffinity) *v1.PersistentVolume {
pv.Spec.NodeAffinity = volumeNodeAffinity
return pv
}
func makePVC(name string, boundPVName string, storageClassName string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: v1.NamespaceDefault,
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: pointer.StringPtr(storageClassName),
},
}
if boundPVName != "" {
pvc.Spec.VolumeName = boundPVName
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "true")
}
return pvc
}
func makePod(name string, pvcNames []string) *v1.Pod {
p := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: v1.NamespaceDefault,
},
}
p.Spec.Volumes = make([]v1.Volume, 0)
for _, pvcName := range pvcNames {
p.Spec.Volumes = append(p.Spec.Volumes, v1.Volume{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
},
},
})
}
return p
}
func TestVolumeBinding(t *testing.T) {
table := []struct {
name string
pod *v1.Pod
node *v1.Node
volumeBinderConfig *scheduling.FakeVolumeBinderConfig
wantStatus *framework.Status
name string
pod *v1.Pod
node *v1.Node
pvcs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
wantPreFilterStatus *framework.Status
wantStateAfterPreFilter *stateData
wantFilterStatus *framework.Status
}{
{
name: "nothing",
pod: &v1.Pod{},
node: &v1.Node{},
wantStatus: nil,
name: "pod has not pvcs",
pod: makePod("pod-a", nil),
node: &v1.Node{},
wantStateAfterPreFilter: &stateData{
skip: true,
},
},
{
name: "all bound",
pod: &v1.Pod{Spec: volState},
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
AllBound: true,
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
pvs: []*v1.PersistentVolume{
makePV("pv-a"),
},
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
claimsToBind: []*v1.PersistentVolumeClaim{},
},
wantStatus: nil,
},
{
name: "unbound/no matches",
pod: &v1.Pod{Spec: volState},
name: "immediate claims not bound",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindReasons: []scheduling.ConflictReason{scheduling.ErrReasonBindConflict},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "", immediateSC.Name),
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)),
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod has unbound immediate PersistentVolumeClaims"),
},
{
name: "unbound claims no matches",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "", waitSC.Name),
},
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{},
claimsToBind: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "", waitSC.Name),
},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)),
},
{
name: "bound and unbound unsatisfied",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindReasons: []scheduling.ConflictReason{scheduling.ErrReasonBindConflict, scheduling.ErrReasonNodeConflict},
pod: makePod("pod-a", []string{"pvc-a", "pvc-b"}),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "barbar",
},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict), string(scheduling.ErrReasonNodeConflict)),
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
makePVC("pvc-b", "", waitSC.Name),
},
pvs: []*v1.PersistentVolume{
addPVNodeAffinity(makePV("pv-a"), &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "foo",
Operator: v1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
},
}),
},
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
claimsToBind: []*v1.PersistentVolumeClaim{
makePVC("pvc-b", "", waitSC.Name),
},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonNodeConflict), string(scheduling.ErrReasonBindConflict)),
},
{
name: "unbound/found matches/bind succeeds",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{},
wantStatus: nil,
name: "pvc not found",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
wantPreFilterStatus: framework.NewStatus(framework.Error, `error getting PVC "default/pvc-a": could not find v1.PersistentVolumeClaim "default/pvc-a"`),
wantFilterStatus: nil,
},
{
name: "predicate error",
pod: &v1.Pod{Spec: volState},
name: "pv not found",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindErr: findErr,
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
wantStatus: framework.NewStatus(framework.Error, findErr.Error()),
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
claimsToBind: []*v1.PersistentVolumeClaim{},
},
wantFilterStatus: framework.NewStatus(framework.Error, `could not find v1.PersistentVolume "pv-a"`),
},
}
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(item.node)
fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig)
p := &VolumeBinding{
Binder: fakeVolumeBinder,
ctx := context.Background()
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
opts := []framework.Option{
framework.WithClientSet(client),
framework.WithInformerFactory(informerFactory),
}
gotStatus := p.Filter(context.Background(), nil, item.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, item.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, item.wantStatus)
fh, err := framework.NewFramework(nil, nil, nil, opts...)
if err != nil {
t.Fatal(err)
}
pl, err := New(&config.VolumeBindingArgs{
BindTimeoutSeconds: 300,
}, fh)
if err != nil {
t.Fatal(err)
}
// Start informer factory after initialization
informerFactory.Start(ctx.Done())
// Feed testing data and wait for them to be synced
client.StorageV1().StorageClasses().Create(ctx, immediateSC, metav1.CreateOptions{})
client.StorageV1().StorageClasses().Create(ctx, waitSC, metav1.CreateOptions{})
if item.node != nil {
client.CoreV1().Nodes().Create(ctx, item.node, metav1.CreateOptions{})
}
if len(item.pvcs) > 0 {
for _, pvc := range item.pvcs {
client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
}
}
if len(item.pvs) > 0 {
for _, pv := range item.pvs {
client.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
}
}
caches := informerFactory.WaitForCacheSync(ctx.Done())
for _, synced := range caches {
if !synced {
t.Errorf("error waiting for informer cache sync")
}
}
// Verify
p := pl.(*VolumeBinding)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(item.node)
state := framework.NewCycleState()
t.Logf("call PreFilter and check status")
gotPreFilterStatus := p.PreFilter(ctx, state, item.pod)
if !reflect.DeepEqual(gotPreFilterStatus, item.wantPreFilterStatus) {
t.Errorf("filter prefilter status does not match: %v, want: %v", gotPreFilterStatus, item.wantPreFilterStatus)
}
if !gotPreFilterStatus.IsSuccess() {
// scheduler framework will skip Filter if PreFilter fails
return
}
t.Logf("check state after prefilter phase")
stateData, err := getStateData(state)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(stateData, item.wantStateAfterPreFilter) {
t.Errorf("state got after prefilter does not match: %v, want: %v", stateData, item.wantStateAfterPreFilter)
}
t.Logf("call Filter and check status")
gotStatus := p.Filter(ctx, state, item.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, item.wantFilterStatus) {
t.Errorf("filter status does not match: %v, want: %v", gotStatus, item.wantFilterStatus)
}
})
}
}

View File

@ -843,7 +843,7 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) {
return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil
}, "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"),
}, "PreFilter", "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(stop)

View File

@ -106,6 +106,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "VolumeBinding"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
},
@ -200,6 +201,7 @@ kind: Policy
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "VolumeBinding"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
},