Skip pods that refer to PVCs that are being deleted

Scheduler should ignore pods that refer to PVCs that either do not exist or
are being deleted.
This commit is contained in:
Jan Safranek 2017-11-23 10:01:23 +01:00
parent 7f8521819d
commit 19caa9c50d
7 changed files with 148 additions and 7 deletions

View File

@ -117,6 +117,12 @@ type ReplicaSetLister interface {
GetPodReplicaSets(*v1.Pod) ([]*extensions.ReplicaSet, error) GetPodReplicaSets(*v1.Pod) ([]*extensions.ReplicaSet, error)
} }
// PersistentVolumeClaimLister interface represents anything that can list PVCs for a scheduler.
type PersistentVolumeClaimLister interface {
// Gets given PVC
Get(namespace, name string) (*v1.PersistentVolumeClaim, error)
}
var _ ControllerLister = &EmptyControllerLister{} var _ ControllerLister = &EmptyControllerLister{}
// EmptyControllerLister implements ControllerLister on []v1.ReplicationController returning empty data // EmptyControllerLister implements ControllerLister on []v1.ReplicationController returning empty data

View File

@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
} }
queue := NewSchedulingQueue() queue := NewSchedulingQueue()
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil) cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
podIgnored := &v1.Pod{} podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {

View File

@ -98,6 +98,7 @@ type genericScheduler struct {
extenders []algorithm.SchedulerExtender extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex lastNodeIndexLock sync.Mutex
lastNodeIndex uint64 lastNodeIndex uint64
pvcLister algorithm.PersistentVolumeClaimLister
cachedNodeInfoMap map[string]*schedulercache.NodeInfo cachedNodeInfoMap map[string]*schedulercache.NodeInfo
volumeBinder *volumebinder.VolumeBinder volumeBinder *volumebinder.VolumeBinder
@ -110,6 +111,10 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond) defer trace.LogIfLong(100 * time.Millisecond)
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return "", err
}
nodes, err := nodeLister.List() nodes, err := nodeLister.List()
if err != nil { if err != nil {
return "", err return "", err
@ -995,6 +1000,32 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedule
return true return true
} }
// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
func podPassesBasicChecks(pod *v1.Pod, pvcLister algorithm.PersistentVolumeClaimLister) error {
// Check PVCs used by the pod
namespace := pod.Namespace
manifest := &(pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
// Volume is not a PVC, ignore
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
pvc, err := pvcLister.Get(namespace, pvcName)
if err != nil {
// The error has already enough context ("persistentvolumeclaim "myclaim" not found")
return err
}
if pvc.DeletionTimestamp != nil {
return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
}
}
return nil
}
func NewGenericScheduler( func NewGenericScheduler(
cache schedulercache.Cache, cache schedulercache.Cache,
eCache *EquivalenceCache, eCache *EquivalenceCache,
@ -1004,7 +1035,8 @@ func NewGenericScheduler(
prioritizers []algorithm.PriorityConfig, prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.MetadataProducer, priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender, extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder) algorithm.ScheduleAlgorithm { volumeBinder *volumebinder.VolumeBinder,
pvcLister algorithm.PersistentVolumeClaimLister) algorithm.ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
cache: cache, cache: cache,
equivalenceCache: eCache, equivalenceCache: eCache,
@ -1016,5 +1048,6 @@ func NewGenericScheduler(
extenders: extenders, extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder, volumeBinder: volumeBinder,
pvcLister: pvcLister,
} }
} }

View File

@ -186,6 +186,7 @@ func TestGenericScheduler(t *testing.T) {
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig prioritizers []algorithm.PriorityConfig
nodes []string nodes []string
pvcs []*v1.PersistentVolumeClaim
pod *v1.Pod pod *v1.Pod
pods []*v1.Pod pods []*v1.Pod
expectedHosts sets.String expectedHosts sets.String
@ -300,6 +301,77 @@ func TestGenericScheduler(t *testing.T) {
}, },
}, },
}, },
{
// Pod with existing PVC
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
nodes: []string{"machine1", "machine2"},
pvcs: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC"}}},
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "ignore"},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "existingPVC",
},
},
},
},
},
},
expectedHosts: sets.NewString("machine1", "machine2"),
name: "existing PVC",
wErr: nil,
},
{
// Pod with non existing PVC
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "ignore"},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "unknownPVC",
},
},
},
},
},
},
name: "unknown PVC",
expectsErr: true,
wErr: fmt.Errorf("persistentvolumeclaim \"unknownPVC\" not found"),
},
{
// Pod with deleting PVC
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
nodes: []string{"machine1", "machine2"},
pvcs: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", DeletionTimestamp: &metav1.Time{}}}},
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "ignore"},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "existingPVC",
},
},
},
},
},
},
name: "deleted PVC",
expectsErr: true,
wErr: fmt.Errorf("persistentvolumeclaim \"existingPVC\" is being deleted"),
},
} }
for _, test := range tests { for _, test := range tests {
cache := schedulercache.New(time.Duration(0), wait.NeverStop) cache := schedulercache.New(time.Duration(0), wait.NeverStop)
@ -309,9 +381,14 @@ func TestGenericScheduler(t *testing.T) {
for _, name := range test.nodes { for _, name := range test.nodes {
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
} }
pvcs := []*v1.PersistentVolumeClaim{}
for _, pvc := range test.pvcs {
pvcs = append(pvcs, pvc)
}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs)
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil) cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister)
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
@ -1190,7 +1267,7 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender) extenders = append(extenders, extender)
} }
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil) cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
// Call Preempt and check the expected results. // Call Preempt and check the expected results.
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil { if err != nil {

View File

@ -903,7 +903,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
glog.Info("Created equivalence class cache") glog.Info("Created equivalence class cache")
} }
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder) algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, &pvcLister{f.pVCLister})
podBackoff := util.CreateDefaultPodBackoff() podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{ return &scheduler.Config{
@ -935,6 +935,14 @@ func (n *nodeLister) List() ([]*v1.Node, error) {
return n.NodeLister.List(labels.Everything()) return n.NodeLister.List(labels.Everything())
} }
type pvcLister struct {
corelisters.PersistentVolumeClaimLister
}
func (p *pvcLister) Get(namespace, name string) (*v1.PersistentVolumeClaim, error) {
return p.PersistentVolumeClaimLister.PersistentVolumeClaims(namespace).Get(name)
}
func (f *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { func (f *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
pluginArgs, err := f.getPluginArgs() pluginArgs, err := f.getPluginArgs()
if err != nil { if err != nil {

View File

@ -532,7 +532,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer, algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil) nil,
schedulertesting.FakePersistentVolumeClaimLister{})
bindingChan := make(chan *v1.Binding, 1) bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1) errChan := make(chan error, 1)
configurator := &FakeConfigurator{ configurator := &FakeConfigurator{
@ -575,7 +576,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer, algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil) nil,
schedulertesting.FakePersistentVolumeClaimLister{})
bindingChan := make(chan *v1.Binding, 2) bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{ configurator := &FakeConfigurator{
Config: &Config{ Config: &Config{

View File

@ -176,3 +176,18 @@ func (f FakeStatefulSetLister) GetPodStatefulSets(pod *v1.Pod) (sss []*apps.Stat
} }
return return
} }
// FakePersistentVolumeClaimLister implements PersistentVolumeClaimLister on []*v1.PersistentVolumeClaim for test purposes.
type FakePersistentVolumeClaimLister []*v1.PersistentVolumeClaim
var _ PersistentVolumeClaimLister = FakePersistentVolumeClaimLister{}
// List returns nodes as a []string.
func (f FakePersistentVolumeClaimLister) Get(namespace, name string) (*v1.PersistentVolumeClaim, error) {
for _, pvc := range f {
if pvc.Name == name && pvc.Namespace == namespace {
return pvc, nil
}
}
return nil, fmt.Errorf("persistentvolumeclaim %q not found", name)
}