Fail fast in PreFilter phase and return UnschedulableAndUnresolvable if immediate PVCs are not bound

This commit is contained in:
Yecheng Fu 2020-06-04 21:55:25 +08:00
parent 930b3a4df0
commit c4138361e4
12 changed files with 123 additions and 41 deletions

View File

@ -156,6 +156,7 @@ profiles:
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -287,6 +288,7 @@ profiles:
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},

View File

@ -78,17 +78,19 @@ type InTreeToCSITranslator interface {
//
// This integrates into the existing scheduler workflow as follows:
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
// a. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here.
// b. Invokes all score plugins. Future/TBD
// c. Selects the best node for the Pod.
// d. Invokes all reserve plugins. AssumePodVolumes() is invoked here.
// a. Invokes all pre-filter plugins for the pod. GetPodVolumes() is invoked
// here, pod volume information will be saved in current scheduling cycle state for later use.
// b. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here.
// c. Invokes all score plugins. Future/TBD
// d. Selects the best node for the Pod.
// e. Invokes all reserve plugins. AssumePodVolumes() is invoked here.
// i. If PVC binding is required, cache in-memory only:
// * For manual binding: update PV objects for prebinding to the corresponding PVCs.
// * For dynamic provisioning: update PVC object with a selected node from c)
// * For the pod, which PVCs and PVs need API updates.
// ii. Afterwards, the main scheduler caches the Pod->Node binding in the scheduler's pod cache,
// This is handled in the scheduler and not here.
// e. Asynchronously bind volumes and pod in a separate goroutine
// f. Asynchronously bind volumes and pod in a separate goroutine
// i. BindPodVolumes() is called first in PreBind phase. It makes all the necessary API updates and waits for
// PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent
// back through the scheduler.
@ -96,6 +98,10 @@ type InTreeToCSITranslator interface {
// 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue
// while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// and unbound with immediate binding (including prebound)
GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error)
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
//
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
@ -105,7 +111,7 @@ type SchedulerVolumeBinder interface {
// (currently) not usable for the pod.
//
// This function is called by the volume binding scheduler predicate and can be called in parallel
FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error)
FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error)
// AssumePodVolumes will:
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
@ -194,7 +200,7 @@ func (b *volumeBinder) DeletePodBindings(pod *v1.Pod) {
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
// That's necessary because some operations will need to pass in to the predicate fake node objects.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) {
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) {
podName := getPodName(pod)
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
@ -248,20 +254,10 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons Confl
b.podBindingCache.UpdateBindings(pod, node.Name, matchedBindings, provisionedClaims)
}()
// The pod's volumes need to be processed in one call to avoid the race condition where
// volumes can get bound/provisioned in between calls.
boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
if err != nil {
return nil, err
}
// Immediate claims should be bound
if len(unboundClaimsImmediate) > 0 {
return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}
// Check PV node affinity on bound volumes
if len(boundClaims) > 0 {
// TODO if node affinity does not match, we should
// UnschedulableAndUnresolvable error back to scheduler framework
boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
if err != nil {
return nil, err
@ -684,9 +680,9 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
return true
}
// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// and unbound with immediate binding (including prebound)
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
boundClaims = []*v1.PersistentVolumeClaim{}
unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
unboundClaimsDelayBinding = []*v1.PersistentVolumeClaim{}

View File

@ -42,8 +42,13 @@ type FakeVolumeBinder struct {
BindCalled bool
}
// GetPodVolumes implements SchedulerVolumeBinder.GetPodVolumes.
func (b *FakeVolumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
return nil, nil, nil, nil
}
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) {
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) {
return b.config.FindReasons, b.config.FindErr
}

View File

@ -771,6 +771,18 @@ func checkReasons(t *testing.T, actual, expected ConflictReasons) {
}
}
// findPodVolumes gets and finds volumes for given pod and node
func findPodVolumes(binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (ConflictReasons, error) {
boundClaims, claimsToBind, unboundClaimsImmediate, err := binder.GetPodVolumes(pod)
if err != nil {
return nil, err
}
if len(unboundClaimsImmediate) > 0 {
return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}
return binder.FindPodVolumes(pod, boundClaims, claimsToBind, node)
}
func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
type scenarioType struct {
// Inputs
@ -907,7 +919,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
}
// Execute
reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
@ -1012,7 +1024,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
}
// Execute
reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
@ -1112,7 +1124,7 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) {
}
// Execute
reasons, err := testEnv.binder.FindPodVolumes(scenario.pod, node)
reasons, err := findPodVolumes(testEnv.binder, scenario.pod, node)
// Validate
if !scenario.shouldFail && err != nil {
@ -1933,7 +1945,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// Execute
// 1. Find matching PVs
reasons, err := testEnv.binder.FindPodVolumes(pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
if err != nil {
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
}
@ -1959,7 +1971,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// This should always return the original chosen pv
// Run this many times in case sorting returns different orders for the two PVs.
for i := 0; i < 50; i++ {
reasons, err := testEnv.binder.FindPodVolumes(pod, testNode)
reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
if err != nil {
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
}

View File

@ -87,6 +87,7 @@ func getDefaultConfig() *schedulerapi.Plugins {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{

View File

@ -58,6 +58,7 @@ func TestClusterAutoscalerProvider(t *testing.T) {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{
@ -154,6 +155,7 @@ func TestApplyFeatureGates(t *testing.T) {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{
@ -238,6 +240,7 @@ func TestApplyFeatureGates(t *testing.T) {
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{

View File

@ -666,6 +666,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -773,6 +774,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -892,6 +894,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1013,6 +1016,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1134,6 +1138,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1260,6 +1265,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
@ -1389,6 +1395,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -1457,6 +1464,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -1545,6 +1553,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "VolumeBinding"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
@ -1740,6 +1749,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
{Name: "PodTopologySpread"},
{Name: "VolumeBinding"},
},
},
Filter: &config.PluginSet{

View File

@ -269,6 +269,7 @@ func NewLegacyRegistry() *LegacyRegistry {
})
registry.registerPredicateConfigProducer(CheckVolumeBindingPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, volumebinding.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil)
plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil)

View File

@ -18,6 +18,7 @@ package volumebinding
import (
"context"
"errors"
"fmt"
"time"
@ -35,14 +36,17 @@ const (
// DefaultBindTimeoutSeconds defines the default bind timeout in seconds
DefaultBindTimeoutSeconds = 600
allBoundStateKey framework.StateKey = "volumebinding:all-bound"
stateKey framework.StateKey = Name
)
type stateData struct {
allBound bool
skip bool // set true if pod does not have PVCs
boundClaims []*v1.PersistentVolumeClaim
claimsToBind []*v1.PersistentVolumeClaim
allBound bool
}
func (d stateData) Clone() framework.StateData {
func (d *stateData) Clone() framework.StateData {
return d
}
@ -58,6 +62,7 @@ type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder
}
var _ framework.PreFilterPlugin = &VolumeBinding{}
var _ framework.FilterPlugin = &VolumeBinding{}
var _ framework.ReservePlugin = &VolumeBinding{}
var _ framework.PreBindPlugin = &VolumeBinding{}
@ -81,6 +86,48 @@ func podHasPVCs(pod *v1.Pod) bool {
return false
}
// PreFilter invoked at the prefilter extension point to check if pod has all
// immediate PVCs bound. If not all immediate PVCs are bound, an
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// If pod does not request any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
state.Write(stateKey, &stateData{skip: true})
return nil
}
boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if len(unboundClaimsImmediate) > 0 {
// Return UnschedulableAndUnresolvable error if immediate claims are
// not bound. Pod will be moved to active/backoff queues once these
// claims are bound by PV controller.
status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return status
}
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind})
return nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeBinding) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func getStateData(cs *framework.CycleState) (*stateData, error) {
state, err := cs.Read(stateKey)
if err != nil {
return nil, err
}
s, ok := state.(*stateData)
if !ok {
return nil, errors.New("unable to convert state into stateData")
}
return s, nil
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests,
// for both bound and unbound PVCs.
@ -98,12 +145,17 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// If pod does not request any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
state, err := getStateData(cs)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if state.skip {
return nil
}
reasons, err := pl.Binder.FindPodVolumes(pod, node)
reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
@ -125,7 +177,7 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState,
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
cs.Write(allBoundStateKey, stateData{allBound: allBound})
cs.Write(stateKey, &stateData{allBound: allBound})
return nil
}
@ -135,14 +187,10 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState,
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
state, err := cs.Read(allBoundStateKey)
s, err := getStateData(cs)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
s, ok := state.(stateData)
if !ok {
return framework.NewStatus(framework.Error, "unable to convert state into stateData")
}
if s.allBound {
// no need to bind volumes
return nil

View File

@ -104,7 +104,9 @@ func TestVolumeBinding(t *testing.T) {
p := &VolumeBinding{
Binder: fakeVolumeBinder,
}
gotStatus := p.Filter(context.Background(), nil, item.pod, nodeInfo)
state := framework.NewCycleState()
p.PreFilter(context.Background(), state, item.pod)
gotStatus := p.Filter(context.Background(), state, item.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, item.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, item.wantStatus)
}

View File

@ -843,7 +843,7 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) {
return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil
}, "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"),
}, "PreFilter", "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(stop)

View File

@ -106,6 +106,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "VolumeBinding"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
},
@ -200,6 +201,7 @@ kind: Policy
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "VolumeBinding"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
},