Merge pull request #90660 from Huang-Wei/synced-sched-err-call

Move unschedulable Pod to internal schedulingQ synchronously
This commit is contained in:
Kubernetes Prow Robot 2020-05-16 19:00:06 -07:00 committed by GitHub
commit d4ce66fe0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 190 deletions

View File

@ -29,8 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -58,11 +56,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
) )
const (
initialGetBackoff = 100 * time.Millisecond
maximalGetBackoff = time.Minute
)
// Binder knows how to write a binding. // Binder knows how to write a binding.
type Binder interface { type Binder interface {
Bind(binding *v1.Binding) error Bind(binding *v1.Binding) error
@ -206,7 +199,7 @@ func (c *Configurator) create() (*Scheduler, error) {
Algorithm: algo, Algorithm: algo,
Profiles: profiles, Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue), NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything, StopEverything: c.StopEverything,
SchedulingQueue: podQueue, SchedulingQueue: podQueue,
}, nil }, nil
@ -476,7 +469,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
} }
// MakeDefaultErrorFunc construct a function to handle pod scheduler error // MakeDefaultErrorFunc construct a function to handle pod scheduler error
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
return func(podInfo *framework.QueuedPodInfo, err error) { return func(podInfo *framework.QueuedPodInfo, err error) {
pod := podInfo.Pod pod := podInfo.Pod
if err == core.ErrNoNodesAvailable { if err == core.ErrNoNodesAvailable {
@ -501,40 +494,17 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch
klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err) klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err)
} }
podSchedulingCycle := podQueue.SchedulingCycle() // Check if the Pod exists in informer cache.
// Retry asynchronously. cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
// Note that this is extremely rudimentary and we need a more real error handling path. if err != nil {
go func() { klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err)
defer utilruntime.HandleCrash() return
podID := types.NamespacedName{ }
Namespace: pod.Namespace, // As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
Name: pod.Name, podInfo.Pod = cachedPod.DeepCopy()
} if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
klog.Error(err)
// Get the pod again; it may have changed/been scheduled already. }
getBackoff := initialGetBackoff
for {
pod, err := client.CoreV1().Pods(podID.Namespace).Get(context.TODO(), podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podInfo.Pod = pod
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podSchedulingCycle); err != nil {
klog.Error(err)
}
}
break
}
if apierrors.IsNotFound(err) {
klog.Warningf("A pod %v no longer exists", podID)
return
}
klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
getBackoff = maximalGetBackoff
}
time.Sleep(getBackoff)
}
}()
} }
} }

View File

@ -19,7 +19,6 @@ package scheduler
import ( import (
"context" "context"
"errors" "errors"
"reflect"
"testing" "testing"
"time" "time"
@ -29,10 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
@ -284,134 +283,152 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
} }
func TestDefaultErrorFunc(t *testing.T) { func TestDefaultErrorFunc(t *testing.T) {
grace := int64(30) testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}
testPod := &v1.Pod{ testPodUpdated := testPod.DeepCopy()
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, testPodUpdated.Labels = map[string]string{"foo": ""}
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways, tests := []struct {
DNSPolicy: v1.DNSClusterFirst, name string
TerminationGracePeriodSeconds: &grace, injectErr error
SecurityContext: &v1.PodSecurityContext{}, podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle
podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle
expect *v1.Pod
}{
{
name: "pod is updated during a scheduling cycle",
injectErr: nil,
podUpdatedDuringScheduling: true,
expect: testPodUpdated,
},
{
name: "pod is not updated during a scheduling cycle",
injectErr: nil,
expect: testPod,
},
{
name: "pod is deleted during a scheduling cycle",
injectErr: nil,
podDeletedDuringScheduling: true,
expect: nil,
}, },
} }
nodeBar, nodeFoo := for _, tt := range tests {
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, t.Run(tt.name, func(t *testing.T) {
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} stopCh := make(chan struct{})
defer close(stopCh)
testPodInfo := &framework.QueuedPodInfo{Pod: testPod} client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{*nodeBar}}) informerFactory := informers.NewSharedInformerFactory(client, 0)
stopCh := make(chan struct{}) podInformer := informerFactory.Core().V1().Pods()
defer close(stopCh) // Need to add/update/delete testPod to the store.
podInformer.Informer().GetStore().Add(testPod)
timestamp := time.Now() queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(timestamp))) schedulerCache := internalcache.New(30*time.Second, stopCh)
schedulerCache := internalcache.New(30*time.Second, stopCh)
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache)
_ = schedulerCache.AddNode(nodeFoo) queue.Add(testPod)
queue.Pop()
// assume nodeFoo was not found if tt.podUpdatedDuringScheduling {
err := apierrors.NewNotFound(apicore.Resource("node"), nodeFoo.Name) podInformer.Informer().GetStore().Update(testPodUpdated)
errFunc(testPodInfo, err) queue.Update(testPod, testPodUpdated)
dump := schedulerCache.Dump() }
for _, n := range dump.Nodes { if tt.podDeletedDuringScheduling {
if e, a := nodeFoo, n.Node(); reflect.DeepEqual(e, a) { podInformer.Informer().GetStore().Delete(testPod)
t.Errorf("Node %s is still in schedulerCache", e.Name) queue.Delete(testPod)
break }
}
}
// Try up to a minute to retrieve the error pod from priority queue testPodInfo := &framework.QueuedPodInfo{Pod: testPod}
foundPodFlag := false errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache)
maxIterations := 10 * 60 errFunc(testPodInfo, tt.injectErr)
for i := 0; i < maxIterations; i++ {
time.Sleep(100 * time.Millisecond)
got := getPodfromPriorityQueue(queue, testPod)
if got == nil {
continue
}
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name) var got *v1.Pod
if tt.podUpdatedDuringScheduling {
head, e := queue.Pop()
if e != nil {
t.Fatalf("Cannot pop pod from the activeQ: %v", e)
}
got = head.Pod
} else {
got = getPodFromPriorityQueue(queue, testPod)
}
if e, a := testPod, got; !reflect.DeepEqual(e, a) { if diff := cmp.Diff(tt.expect, got); diff != "" {
t.Errorf("Expected %v, got %v", e, a) t.Errorf("Unexpected pod (-want, +got): %s", diff)
} }
})
foundPodFlag = true
break
}
if !foundPodFlag {
t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
}
_ = queue.Delete(testPod)
// Trigger error handling again to put the pod in unschedulable queue
errFunc(testPodInfo, nil)
// Try up to a minute to retrieve the error pod from priority queue
foundPodFlag = false
maxIterations = 10 * 60
for i := 0; i < maxIterations; i++ {
time.Sleep(100 * time.Millisecond)
got := getPodfromPriorityQueue(queue, testPod)
if got == nil {
continue
}
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
foundPodFlag = true
break
}
if !foundPodFlag {
t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
}
// Remove the pod from priority queue to test putting error
// pod in backoff queue.
queue.Delete(testPod)
// Trigger a move request
queue.MoveAllToActiveOrBackoffQueue("test")
// Trigger error handling again to put the pod in backoff queue
errFunc(testPodInfo, nil)
foundPodFlag = false
for i := 0; i < maxIterations; i++ {
time.Sleep(100 * time.Millisecond)
// The pod should be found from backoff queue at this time
got := getPodfromPriorityQueue(queue, testPod)
if got == nil {
continue
}
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
foundPodFlag = true
break
}
if !foundPodFlag {
t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod)
} }
} }
// getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get func TestDefaultErrorFunc_NodeNotFound(t *testing.T) {
nodeFoo := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
nodeBar := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}
tests := []struct {
name string
nodes []v1.Node
nodeNameToDelete string
injectErr error
expectNodeNames sets.String
}{
{
name: "node is deleted during a scheduling cycle",
nodes: []v1.Node{*nodeFoo, *nodeBar},
nodeNameToDelete: "foo",
injectErr: apierrors.NewNotFound(apicore.Resource("node"), nodeFoo.Name),
expectNodeNames: sets.NewString("bar"),
},
{
name: "node is not deleted but NodeNotFound is received incorrectly",
nodes: []v1.Node{*nodeFoo, *nodeBar},
injectErr: apierrors.NewNotFound(apicore.Resource("node"), nodeFoo.Name),
expectNodeNames: sets.NewString("foo", "bar"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: tt.nodes})
informerFactory := informers.NewSharedInformerFactory(client, 0)
podInformer := informerFactory.Core().V1().Pods()
// Need to add testPod to the store.
podInformer.Informer().GetStore().Add(testPod)
queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
schedulerCache := internalcache.New(30*time.Second, stopCh)
for i := range tt.nodes {
node := tt.nodes[i]
// Add node to schedulerCache no matter it's deleted in API server or not.
schedulerCache.AddNode(&node)
if node.Name == tt.nodeNameToDelete {
client.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{})
}
}
testPodInfo := &framework.QueuedPodInfo{Pod: testPod}
errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache)
errFunc(testPodInfo, tt.injectErr)
gotNodes := schedulerCache.Dump().Nodes
gotNodeNames := sets.NewString()
for _, nodeInfo := range gotNodes {
gotNodeNames.Insert(nodeInfo.Node().Name)
}
if diff := cmp.Diff(tt.expectNodeNames, gotNodeNames); diff != "" {
t.Errorf("Unexpected nodes (-want, +got): %s", diff)
}
})
}
}
// getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
// the specific pod from the given priority queue. It returns the found pod in the priority queue. // the specific pod from the given priority queue. It returns the found pod in the priority queue.
func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod { func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
podList := queue.PendingPods() podList := queue.PendingPods()
if len(podList) == 0 { if len(podList) == 0 {
return nil return nil
@ -436,33 +453,6 @@ func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v
return nil return nil
} }
// testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test.
// It tests whether the fake client can receive request and correctly "get" the namespace
// and name of the error pod.
func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) {
requestReceived := false
actions := client.Actions()
for _, a := range actions {
if a.GetVerb() == "get" && a.GetResource().Resource == "pods" {
getAction, ok := a.(clienttesting.GetAction)
if !ok {
t.Errorf("Can't cast action object to GetAction interface")
break
}
name := getAction.GetName()
ns := a.GetNamespace()
if name != podName || ns != podNs {
t.Errorf("Expected name %s namespace %s, got %s %s",
podName, podNs, name, ns)
}
requestReceived = true
}
}
if !requestReceived {
t.Errorf("Get pod request not received")
}
}
func newConfigFactoryWithFrameworkRegistry( func newConfigFactoryWithFrameworkRegistry(
client clientset.Interface, stopCh <-chan struct{}, client clientset.Interface, stopCh <-chan struct{},
registry framework.Registry) *Configurator { registry framework.Registry) *Configurator {

View File

@ -563,7 +563,7 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
func (p *PriorityQueue) PendingPods() []*v1.Pod { func (p *PriorityQueue) PendingPods() []*v1.Pod {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
result := []*v1.Pod{} var result []*v1.Pod
for _, pInfo := range p.activeQ.List() { for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
} }

View File

@ -1394,7 +1394,7 @@ func TestFilterPlugin(t *testing.T) {
} }
// Create the master and the scheduler with the test plugin set. // Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "filter-plugin", nil), 2, testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "filter-plugin", nil), 1,
scheduler.WithProfiles(prof), scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry)) scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx) defer testutils.CleanupTest(t, testCtx)
@ -1418,8 +1418,8 @@ func TestFilterPlugin(t *testing.T) {
} }
} }
if filterPlugin.numFilterCalled == 0 { if filterPlugin.numFilterCalled != 1 {
t.Errorf("Expected the filter plugin to be called.") t.Errorf("Expected the filter plugin to be called 1 time, but got %v.", filterPlugin.numFilterCalled)
} }
filterPlugin.reset() filterPlugin.reset()