mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #26016 from hongchaodeng/phantom
Automatic merge from submit-queue
scheduler: change phantom pod test from integration into unit test
This is an effort for #24440.
Why this PR?
- Integration test is hard to debug. We could model the test as a unit test similar to [TestSchedulerForgetAssumedPodAfterDelete()](132ebb091a/plugin/pkg/scheduler/scheduler_test.go (L173)
). Currently the test is testing expiring case, we can change that to delete.
- Add a test similar to TestSchedulerForgetAssumedPodAfterDelete() to test phantom pod.
- refactor scheduler tests to share the code between TestSchedulerNoPhantomPodAfterExpire() and TestSchedulerNoPhantomPodAfterDelete()
- Decouple scheduler tests from scheduler events: not to use events
This commit is contained in:
commit
e7f7a49bac
@ -295,66 +295,6 @@ func podRunning(c *client.Client, podNamespace string, podName string) wait.Cond
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func runSchedulerNoPhantomPodsTest(client *client.Client) {
|
|
||||||
pod := &api.Pod{
|
|
||||||
Spec: api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "c1",
|
|
||||||
Image: e2e.GetPauseImageName(client),
|
|
||||||
Ports: []api.ContainerPort{
|
|
||||||
{ContainerPort: 1234, HostPort: 9999},
|
|
||||||
},
|
|
||||||
ImagePullPolicy: api.PullIfNotPresent,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assuming we only have two kubelets, the third pod here won't schedule
|
|
||||||
// if the scheduler doesn't correctly handle the delete for the second
|
|
||||||
// pod.
|
|
||||||
pod.ObjectMeta.Name = "phantom.foo"
|
|
||||||
foo, err := client.Pods(api.NamespaceDefault).Create(pod)
|
|
||||||
if err != nil {
|
|
||||||
glog.Fatalf("Failed to create pod: %v, %v", pod, err)
|
|
||||||
}
|
|
||||||
if err := wait.Poll(time.Second, longTestTimeout, podRunning(client, foo.Namespace, foo.Name)); err != nil {
|
|
||||||
glog.Fatalf("FAILED: pod never started running %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
pod.ObjectMeta.Name = "phantom.bar"
|
|
||||||
bar, err := client.Pods(api.NamespaceDefault).Create(pod)
|
|
||||||
if err != nil {
|
|
||||||
glog.Fatalf("Failed to create pod: %v, %v", pod, err)
|
|
||||||
}
|
|
||||||
if err := wait.Poll(time.Second, longTestTimeout, podRunning(client, bar.Namespace, bar.Name)); err != nil {
|
|
||||||
glog.Fatalf("FAILED: pod never started running %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete a pod to free up room.
|
|
||||||
glog.Infof("Deleting pod %v", bar.Name)
|
|
||||||
err = client.Pods(api.NamespaceDefault).Delete(bar.Name, api.NewDeleteOptions(0))
|
|
||||||
if err != nil {
|
|
||||||
glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
pod.ObjectMeta.Name = "phantom.baz"
|
|
||||||
baz, err := client.Pods(api.NamespaceDefault).Create(pod)
|
|
||||||
if err != nil {
|
|
||||||
glog.Fatalf("Failed to create pod: %v, %v", pod, err)
|
|
||||||
}
|
|
||||||
if err := wait.Poll(time.Second, longTestTimeout, podRunning(client, baz.Namespace, baz.Name)); err != nil {
|
|
||||||
if pod, perr := client.Pods(api.NamespaceDefault).Get("phantom.bar"); perr == nil {
|
|
||||||
glog.Fatalf("FAILED: 'phantom.bar' was never deleted: %#v, err: %v", pod, err)
|
|
||||||
} else {
|
|
||||||
glog.Fatalf("FAILED: (Scheduler probably didn't process deletion of 'phantom.bar') Pod never started running: err: %v, perr: %v", err, perr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.Info("Scheduler doesn't make phantom pods: test passed.")
|
|
||||||
}
|
|
||||||
|
|
||||||
type testFunc func(*client.Client)
|
type testFunc func(*client.Client)
|
||||||
|
|
||||||
func addFlags(fs *pflag.FlagSet) {
|
func addFlags(fs *pflag.FlagSet) {
|
||||||
@ -457,11 +397,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
glog.Infof("OK - found created containers: %#v", createdConts.List())
|
glog.Infof("OK - found created containers: %#v", createdConts.List())
|
||||||
|
|
||||||
// This test doesn't run with the others because it can't run in
|
|
||||||
// parallel and also it schedules extra pods which would change the
|
|
||||||
// above pod counting logic.
|
|
||||||
runSchedulerNoPhantomPodsTest(kubeClient)
|
|
||||||
|
|
||||||
glog.Infof("\n\nLogging high latency metrics from the 10250 kubelet")
|
glog.Infof("\n\nLogging high latency metrics from the 10250 kubelet")
|
||||||
e2e.HighLatencyKubeletOperations(nil, 1*time.Second, "localhost:10250")
|
e2e.HighLatencyKubeletOperations(nil, 1*time.Second, "localhost:10250")
|
||||||
glog.Infof("\n\nLogging high latency metrics from the 10251 kubelet")
|
glog.Infof("\n\nLogging high latency metrics from the 10251 kubelet")
|
||||||
|
@ -18,9 +18,7 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -30,6 +28,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/util/diff"
|
"k8s.io/kubernetes/pkg/util/diff"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
@ -169,163 +168,167 @@ func TestScheduler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
|
||||||
// Set up a channel through which we'll funnel log messages from the watcher.
|
|
||||||
// This way, we can guarantee that when the test ends no thread will still be
|
|
||||||
// trying to write to t.Logf (which it would if we handed t.Logf directly to
|
|
||||||
// StartLogging).
|
|
||||||
ch := make(chan string)
|
|
||||||
done := make(chan struct{})
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg := <-ch:
|
|
||||||
t.Log(msg)
|
|
||||||
case <-done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
watcher := eventBroadcaster.StartLogging(func(format string, args ...interface{}) {
|
|
||||||
ch <- fmt.Sprintf(format, args...)
|
|
||||||
})
|
|
||||||
defer func() {
|
|
||||||
watcher.Stop()
|
|
||||||
close(done)
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Setup stores to test pod's workflow:
|
|
||||||
// - queuedPodStore: pods queued before processing
|
|
||||||
// - scheduledPodStore: pods that has a scheduling decision
|
|
||||||
scheduledPodStore := clientcache.NewStore(clientcache.MetaNamespaceKeyFunc)
|
|
||||||
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
||||||
|
|
||||||
// Port is the easiest way to cause a fit predicate failure
|
|
||||||
podPort := 8080
|
|
||||||
firstPod := podWithPort("foo", "", podPort)
|
|
||||||
|
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
defer close(stop)
|
defer close(stop)
|
||||||
cache := schedulercache.New(1*time.Second, stop)
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
||||||
|
scache := schedulercache.New(100*time.Millisecond, stop)
|
||||||
|
pod := podWithPort("pod.Name", "", 8080)
|
||||||
|
scheduler, bindingChan, _ := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, pod)
|
||||||
|
|
||||||
|
waitPodExpireChan := make(chan struct{})
|
||||||
|
timeout := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
pods, err := scache.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cache.List failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(pods) == 0 {
|
||||||
|
close(waitPodExpireChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// waiting for the assumed pod to expire
|
||||||
|
select {
|
||||||
|
case <-waitPodExpireChan:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
close(timeout)
|
||||||
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
|
||||||
|
secondPod := podWithPort("bar", "", 8080)
|
||||||
|
queuedPodStore.Add(secondPod)
|
||||||
|
scheduler.scheduleOne()
|
||||||
|
select {
|
||||||
|
case b := <-bindingChan:
|
||||||
|
expectBinding := &api.Binding{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "bar"},
|
||||||
|
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(expectBinding, b) {
|
||||||
|
t.Errorf("binding want=%v, get=%v", expectBinding, b)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
||||||
|
stop := make(chan struct{})
|
||||||
|
defer close(stop)
|
||||||
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
||||||
|
scache := schedulercache.New(10*time.Minute, stop)
|
||||||
|
firstPod := podWithPort("pod.Name", "", 8080)
|
||||||
|
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, firstPod)
|
||||||
|
|
||||||
|
// We use conflicted pod ports to incur fit predicate failure.
|
||||||
|
secondPod := podWithPort("bar", "", 8080)
|
||||||
|
queuedPodStore.Add(secondPod)
|
||||||
|
// queuedPodStore: [bar:8080]
|
||||||
|
// cache: [(assumed)foo:8080]
|
||||||
|
|
||||||
|
scheduler.scheduleOne()
|
||||||
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
|
expectErr := &FitError{
|
||||||
|
Pod: secondPod,
|
||||||
|
FailedPredicates: FailedPredicateMap{"machine1": "PodFitsHostPorts"},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(expectErr, err) {
|
||||||
|
t.Errorf("err want=%v, get=%v", expectErr, err)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We mimic the workflow of cache behavior when a pod is removed by user.
|
||||||
|
// Note: if the schedulercache timeout would be super short, the first pod would expire
|
||||||
|
// and would be removed itself (without any explicit actions on schedulercache). Even in that case,
|
||||||
|
// explicitly AddPod will as well correct the behavior.
|
||||||
|
firstPod.Spec.NodeName = "machine1"
|
||||||
|
if err := scache.AddPod(firstPod); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if err := scache.RemovePod(firstPod); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
queuedPodStore.Add(secondPod)
|
||||||
|
scheduler.scheduleOne()
|
||||||
|
select {
|
||||||
|
case b := <-bindingChan:
|
||||||
|
expectBinding := &api.Binding{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "bar"},
|
||||||
|
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(expectBinding, b) {
|
||||||
|
t.Errorf("binding want=%v, get=%v", expectBinding, b)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// queuedPodStore: pods queued before processing.
|
||||||
|
// cache: scheduler cache that might contain assumed pods.
|
||||||
|
func setupTestSchedulerWithOnePod(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, pod *api.Pod) (*Scheduler, chan *api.Binding, chan error) {
|
||||||
// Create the scheduler config
|
// Create the scheduler config
|
||||||
algo := NewGenericScheduler(
|
algo := NewGenericScheduler(
|
||||||
cache,
|
scache,
|
||||||
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
|
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
|
||||||
[]algorithm.PriorityConfig{},
|
[]algorithm.PriorityConfig{},
|
||||||
[]algorithm.SchedulerExtender{})
|
[]algorithm.SchedulerExtender{})
|
||||||
|
bindingChan := make(chan *api.Binding, 1)
|
||||||
var gotBinding *api.Binding
|
errChan := make(chan error, 1)
|
||||||
c := &Config{
|
cfg := &Config{
|
||||||
SchedulerCache: cache,
|
SchedulerCache: scache,
|
||||||
NodeLister: algorithm.FakeNodeLister(
|
NodeLister: algorithm.FakeNodeLister(
|
||||||
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
|
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
|
||||||
),
|
),
|
||||||
Algorithm: algo,
|
Algorithm: algo,
|
||||||
Binder: fakeBinder{func(b *api.Binding) error {
|
Binder: fakeBinder{func(b *api.Binding) error {
|
||||||
scheduledPodStore.Add(podWithPort(b.Name, b.Target.Name, podPort))
|
bindingChan <- b
|
||||||
gotBinding = b
|
|
||||||
return nil
|
return nil
|
||||||
}},
|
}},
|
||||||
NextPod: func() *api.Pod {
|
NextPod: func() *api.Pod {
|
||||||
return clientcache.Pop(queuedPodStore).(*api.Pod)
|
return clientcache.Pop(queuedPodStore).(*api.Pod)
|
||||||
},
|
},
|
||||||
Error: func(p *api.Pod, err error) {
|
Error: func(p *api.Pod, err error) {
|
||||||
t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)
|
errChan <- err
|
||||||
},
|
},
|
||||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
|
Recorder: &record.FakeRecorder{},
|
||||||
|
PodConditionUpdater: fakePodConditionUpdater{},
|
||||||
}
|
}
|
||||||
|
scheduler := New(cfg)
|
||||||
|
|
||||||
// First scheduling pass should schedule the pod
|
queuedPodStore.Add(pod)
|
||||||
s := New(c)
|
|
||||||
called := make(chan struct{})
|
|
||||||
events := eventBroadcaster.StartEventWatcher(func(e *api.Event) {
|
|
||||||
if e, a := "Scheduled", e.Reason; e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
close(called)
|
|
||||||
})
|
|
||||||
|
|
||||||
queuedPodStore.Add(firstPod)
|
|
||||||
// queuedPodStore: [foo:8080]
|
// queuedPodStore: [foo:8080]
|
||||||
// scheduledPodStore: []
|
// cache: []
|
||||||
// assumedPods: []
|
|
||||||
|
|
||||||
s.scheduleOne()
|
scheduler.scheduleOne()
|
||||||
<-called
|
|
||||||
// queuedPodStore: []
|
// queuedPodStore: []
|
||||||
// scheduledPodStore: [foo:8080]
|
// cache: [(assumed)foo:8080]
|
||||||
// assumedPods: [foo:8080]
|
|
||||||
|
|
||||||
pod, exists, _ := scheduledPodStore.GetByKey("foo")
|
select {
|
||||||
if !exists {
|
case b := <-bindingChan:
|
||||||
t.Errorf("Expected scheduled pod store to contain pod")
|
expectBinding := &api.Binding{
|
||||||
}
|
ObjectMeta: api.ObjectMeta{Name: "pod.Name"},
|
||||||
pod, exists, _ = queuedPodStore.GetByKey("foo")
|
|
||||||
if exists {
|
|
||||||
t.Errorf("Did not expect a queued pod, found %+v", pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectBind := &api.Binding{
|
|
||||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
|
||||||
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
||||||
}
|
}
|
||||||
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
|
if !reflect.DeepEqual(expectBinding, b) {
|
||||||
t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac))
|
t.Errorf("binding want=%v, get=%v", expectBinding, b)
|
||||||
}
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
events.Stop()
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
|
||||||
scheduledPodStore.Delete(pod)
|
|
||||||
|
|
||||||
secondPod := podWithPort("bar", "", podPort)
|
|
||||||
queuedPodStore.Add(secondPod)
|
|
||||||
// queuedPodStore: [bar:8080]
|
|
||||||
// scheduledPodStore: []
|
|
||||||
// assumedPods: [foo:8080]
|
|
||||||
|
|
||||||
var waitUntilExpired sync.WaitGroup
|
|
||||||
waitUntilExpired.Add(1)
|
|
||||||
// waiting for the assumed pod to expire
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
pods, err := cache.List(labels.Everything())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cache.List failed: %v", err)
|
|
||||||
}
|
}
|
||||||
if len(pods) == 0 {
|
return scheduler, bindingChan, errChan
|
||||||
waitUntilExpired.Done()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
waitUntilExpired.Wait()
|
|
||||||
|
|
||||||
// Second scheduling pass will fail to schedule if the store hasn't expired
|
|
||||||
// the deleted pod. This would normally happen with a timeout.
|
|
||||||
|
|
||||||
called = make(chan struct{})
|
|
||||||
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {
|
|
||||||
if e, a := "Scheduled", e.Reason; e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
close(called)
|
|
||||||
})
|
|
||||||
|
|
||||||
s.scheduleOne()
|
|
||||||
<-called
|
|
||||||
|
|
||||||
expectBind = &api.Binding{
|
|
||||||
ObjectMeta: api.ObjectMeta{Name: "bar"},
|
|
||||||
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
|
||||||
}
|
|
||||||
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
|
|
||||||
t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac))
|
|
||||||
}
|
|
||||||
events.Stop()
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user