From 4b0607cf0b7619faba71e2f30b5240c42e562e4a Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Fri, 1 May 2015 17:00:37 -0700 Subject: [PATCH] Scheduler ignored nodes with unknown condition status --- pkg/client/cache/listers.go | 49 ++++++ plugin/pkg/scheduler/factory/factory.go | 17 +- test/integration/scheduler_test.go | 212 +++++++++++++++++++----- 3 files changed, 230 insertions(+), 48 deletions(-) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 0a610047802..5c7724ba0f8 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -21,6 +21,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/golang/glog" ) // TODO: generate these classes and methods for all resources of interest using @@ -106,6 +107,54 @@ func (s *StoreToNodeLister) List() (machines api.NodeList, err error) { return machines, nil } +// NodeCondition returns a storeToNodeConditionLister +func (s *StoreToNodeLister) NodeCondition(conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) storeToNodeConditionLister { + // TODO: Move this filtering server side. Currently our selectors don't facilitate searching through a list so we + // have the reflector filter out the Unschedulable field and sift through node conditions in the lister. + return storeToNodeConditionLister{s.Store, conditionType, conditionStatus} +} + +// storeToNodeConditionLister filters and returns nodes matching the given type and status from the store. +type storeToNodeConditionLister struct { + store Store + conditionType api.NodeConditionType + conditionStatus api.ConditionStatus +} + +// List returns a list of nodes that match the condition type/status in the storeToNodeConditionLister. +func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) { + for _, m := range s.store.List() { + node := *m.(*api.Node) + + // We currently only use a conditionType of "Ready". If the kubelet doesn't + // periodically report the status of a node, the nodecontroller sets its + // ConditionStatus to "Unknown". If the kubelet thinks a node is unhealthy + // it can (in theory) set its ConditionStatus to "False". + var nodeCondition *api.NodeCondition + + // Get the last condition of the required type + for _, cond := range node.Status.Conditions { + if cond.Type == s.conditionType { + nodeCondition = &cond + } else { + glog.V(4).Infof("Ignoring condition type %v for node %v", cond.Type, node.Name) + } + } + + // Check that the condition has the required status + if nodeCondition != nil { + if nodeCondition.Status == s.conditionStatus { + nodes.Items = append(nodes.Items, node) + } else { + glog.V(4).Infof("Ignoring node %v with condition status %v", node.Name, nodeCondition.Status) + } + } else { + glog.V(2).Infof("Node %s doesn't have conditions of type %v", node.Name, s.conditionType) + } + } + return +} + // TODO Move this back to scheduler as a helper function that takes a Store, // rather than a method of StoreToNodeLister. // GetNodeInfo returns cached data for the minion 'id'. diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 421b0e61316..74f500668a1 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -64,9 +64,10 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), ScheduledPodLister: &cache.StoreToPodLister{}, - NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, - ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, - StopEverything: make(chan struct{}), + // Only nodes in the "Ready" condition with status == "True" are schedulable + NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + StopEverything: make(chan struct{}), } modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler @@ -150,8 +151,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe pluginArgs := PluginFactoryArgs{ PodLister: f.PodLister, ServiceLister: f.ServiceLister, - NodeLister: f.NodeLister, - NodeInfo: f.NodeLister, + // All fit predicates only need to consider schedulable nodes. + NodeLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue), + NodeInfo: f.NodeLister, } predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs) if err != nil { @@ -191,8 +193,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe } return &scheduler.Config{ - Modeler: f.modeler, - MinionLister: f.NodeLister, + Modeler: f.modeler, + // The scheduler only needs to consider schedulable nodes. + MinionLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue), Algorithm: algo, Binder: &binder{f.Client}, NextPod: func() *api.Pod { diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index 00a2880388f..4ce9efe4341 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -21,6 +21,7 @@ package integration // This file tests the scheduler. import ( + "fmt" "net/http" "net/http/httptest" "testing" @@ -31,9 +32,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" @@ -45,6 +48,13 @@ func init() { requireEtcd() } +type nodeMutationFunc func(t *testing.T, n *api.Node, nodeStore cache.Store, c *client.Client) + +type nodeStateManager struct { + makeSchedulable nodeMutationFunc + makeUnSchedulable nodeMutationFunc +} + func TestUnschedulableNodes(t *testing.T) { helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) if err != nil { @@ -69,21 +79,21 @@ func TestUnschedulableNodes(t *testing.T) { AdmissionControl: admit.NewAlwaysAdmit(), }) - client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) + restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) - schedulerConfigFactory := factory.NewConfigFactory(client) + schedulerConfigFactory := factory.NewConfigFactory(restClient) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) } eventBroadcaster := record.NewBroadcaster() schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) - eventBroadcaster.StartRecordingToSink(client.Events("")) + eventBroadcaster.StartRecordingToSink(restClient.Events("")) scheduler.New(schedulerConfig).Run() defer close(schedulerConfig.StopEverything) - DoTestUnschedulableNodes(t, client) + DoTestUnschedulableNodes(t, restClient, schedulerConfigFactory.NodeLister.Store) } func podScheduled(c *client.Client, podNamespace, podName string) wait.ConditionFunc { @@ -103,50 +113,170 @@ func podScheduled(c *client.Client, podNamespace, podName string) wait.Condition } } -func DoTestUnschedulableNodes(t *testing.T, client *client.Client) { - node := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "node"}, - Spec: api.NodeSpec{Unschedulable: true}, - } - if _, err := client.Nodes().Create(node); err != nil { - t.Fatalf("Failed to create node: %v", err) - } +// Wait till the passFunc confirms that the object it expects to see is in the store. +// Used to observe reflected events. +func waitForReflection(s cache.Store, key string, passFunc func(n interface{}) bool) error { + return wait.Poll(time.Millisecond*10, time.Second*20, func() (bool, error) { + if n, _, err := s.GetByKey(key); err == nil && passFunc(n) { + return true, nil + } + return false, nil + }) +} - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "my-pod"}, - Spec: api.PodSpec{ - Containers: []api.Container{{Name: "container", Image: "kubernetes/pause:go"}}, +func DoTestUnschedulableNodes(t *testing.T, restClient *client.Client, nodeStore cache.Store) { + goodCondition := api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: fmt.Sprintf("schedulable condition"), + LastHeartbeatTime: util.Time{time.Now()}, + } + badCondition := api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("unschedulable condition"), + LastHeartbeatTime: util.Time{time.Now()}, + } + // Create a new schedulable node, since we're first going to apply + // the unschedulable condition and verify that pods aren't scheduled. + node := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "node-scheduling-test-node"}, + Spec: api.NodeSpec{Unschedulable: false}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{goodCondition}, }, } - myPod, err := client.Pods(api.NamespaceDefault).Create(pod) + nodeKey, err := cache.MetaNamespaceKeyFunc(node) if err != nil { - t.Fatalf("Failed to create pod: %v", err) - } - // There are no schedulable nodes - the pod shouldn't be scheduled. - err = wait.Poll(time.Second, time.Second*10, podScheduled(client, myPod.Namespace, myPod.Name)) - if err == nil { - t.Errorf("Pod scheduled successfully on unschedulable nodes") - } - if err != wait.ErrWaitTimeout { - t.Errorf("Failed while waiting for scheduled pod: %v", err) + t.Fatalf("Couldn't retrieve key for node %v", node.Name) } - // Make the node schedulable and wait until the pod is scheduled. - newNode, err := client.Nodes().Get(node.Name) - if err != nil { - t.Fatalf("Failed to get node: %v", err) - } - newNode.Spec.Unschedulable = false - if _, err = client.Nodes().Update(newNode); err != nil { - t.Fatalf("Failed to update node: %v", err) - } - err = wait.Poll(time.Second, time.Second*10, podScheduled(client, myPod.Namespace, myPod.Name)) - if err != nil { - t.Errorf("Failed to schedule a pod: %v", err) + // The test does the following for each nodeStateManager in this list: + // 1. Create a new node + // 2. Apply the makeUnSchedulable function + // 3. Create a new pod + // 4. Check that the pod doesn't get assigned to the node + // 5. Apply the schedulable function + // 6. Check that the pod *does* get assigned to the node + // 7. Delete the pod and node. + + nodeModifications := []nodeStateManager{ + // Test node.Spec.Unschedulable=true/false + { + makeUnSchedulable: func(t *testing.T, n *api.Node, s cache.Store, c *client.Client) { + n.Spec.Unschedulable = true + if _, err := c.Nodes().Update(n); err != nil { + t.Fatalf("Failed to update node with unschedulable=true: %v", err) + } + err = waitForReflection(s, nodeKey, func(node interface{}) bool { + // An unschedulable node should get deleted from the store + return node == nil + }) + if err != nil { + t.Fatalf("Failed to observe reflected update for setting unschedulable=true: %v", err) + } + }, + makeSchedulable: func(t *testing.T, n *api.Node, s cache.Store, c *client.Client) { + n.Spec.Unschedulable = false + if _, err := c.Nodes().Update(n); err != nil { + t.Fatalf("Failed to update node with unschedulable=false: %v", err) + } + err = waitForReflection(s, nodeKey, func(node interface{}) bool { + return node != nil && node.(*api.Node).Spec.Unschedulable == false + }) + if err != nil { + t.Fatalf("Failed to observe reflected update for setting unschedulable=false: %v", err) + } + }, + }, + // Test node.Status.Conditions=ConditionTrue/Unknown + { + makeUnSchedulable: func(t *testing.T, n *api.Node, s cache.Store, c *client.Client) { + n.Status = api.NodeStatus{ + Conditions: []api.NodeCondition{badCondition}, + } + if _, err = c.Nodes().UpdateStatus(n); err != nil { + t.Fatalf("Failed to update node with bad status condition: %v", err) + } + err = waitForReflection(s, nodeKey, func(node interface{}) bool { + return node != nil && node.(*api.Node).Status.Conditions[0].Status == api.ConditionUnknown + }) + if err != nil { + t.Fatalf("Failed to observe reflected update for status condition update: %v", err) + } + }, + makeSchedulable: func(t *testing.T, n *api.Node, s cache.Store, c *client.Client) { + n.Status = api.NodeStatus{ + Conditions: []api.NodeCondition{goodCondition}, + } + if _, err = c.Nodes().UpdateStatus(n); err != nil { + t.Fatalf("Failed to update node with healthy status condition: %v", err) + } + waitForReflection(s, nodeKey, func(node interface{}) bool { + return node != nil && node.(*api.Node).Status.Conditions[0].Status == api.ConditionTrue + }) + if err != nil { + t.Fatalf("Failed to observe reflected update for status condition update: %v", err) + } + }, + }, } - err = client.Pods(api.NamespaceDefault).Delete(myPod.Name, nil) - if err != nil { - t.Errorf("Failed to delete pod: %v", err) + for i, mod := range nodeModifications { + unSchedNode, err := restClient.Nodes().Create(node) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + // Apply the unschedulable modification to the node, and wait for the reflection + mod.makeUnSchedulable(t, unSchedNode, nodeStore, restClient) + + // Create the new pod, note that this needs to happen post unschedulable + // modification or we have a race in the test. + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "node-scheduling-test-pod"}, + Spec: api.PodSpec{ + Containers: []api.Container{{Name: "container", Image: "kubernetes/pause:go"}}, + }, + } + myPod, err := restClient.Pods(api.NamespaceDefault).Create(pod) + if err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + + // There are no schedulable nodes - the pod shouldn't be scheduled. + err = wait.Poll(time.Second, time.Second*10, podScheduled(restClient, myPod.Namespace, myPod.Name)) + if err == nil { + t.Errorf("Pod scheduled successfully on unschedulable nodes") + } + if err != wait.ErrWaitTimeout { + t.Errorf("Test %d: failed while trying to confirm the pod does not get scheduled on the node: %v", err) + } else { + t.Logf("Test %d: Pod did not get scheduled on an unschedulable node", i) + } + + // Apply the schedulable modification to the node, and wait for the reflection + schedNode, err := restClient.Nodes().Get(unSchedNode.Name) + if err != nil { + t.Fatalf("Failed to get node: %v", err) + } + mod.makeSchedulable(t, schedNode, nodeStore, restClient) + + // Wait until the pod is scheduled. + err = wait.Poll(time.Second, time.Second*10, podScheduled(restClient, myPod.Namespace, myPod.Name)) + if err != nil { + t.Errorf("Test %d: failed to schedule a pod: %v", err) + } else { + t.Logf("Test %d: Pod got scheduled on a schedulable node", i) + } + + err = restClient.Pods(api.NamespaceDefault).Delete(myPod.Name, nil) + if err != nil { + t.Errorf("Failed to delete pod: %v", err) + } + err = restClient.Nodes().Delete(schedNode.Name) + if err != nil { + t.Errorf("Failed to delete node: %v", err) + } } }