diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index aa93d587cce..5b2eb46c0e9 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -20,20 +20,26 @@ import ( "errors" "fmt" "net" + "strings" "sync" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/watch" ) var ( @@ -98,6 +104,14 @@ type NodeController struct { // The maximum duration before a pod evicted from a node can be forcefully terminated. maximumGracePeriod time.Duration recorder record.EventRecorder + // Pod framework and store + podController *framework.Controller + podStore cache.StoreToPodLister + // Node framework and store + nodeController *framework.Controller + nodeStore cache.StoreToNodeLister + + forcefullyDeletePod func(*api.Pod) } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -125,7 +139,8 @@ func NewNodeController( glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") } evictorLock := sync.Mutex{} - return &NodeController{ + + nc := &NodeController{ cloud: cloud, knownNodeSet: make(sets.String), kubeClient: kubeClient, @@ -143,11 +158,45 @@ func NewNodeController( now: unversioned.Now, clusterCIDR: clusterCIDR, allocateNodeCIDRs: allocateNodeCIDRs, + forcefullyDeletePod: func(p *api.Pod) { forcefullyDeletePod(kubeClient, p) }, } + + nc.podStore.Store, nc.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return nc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{ + AddFunc: nc.maybeDeleteTerminatingPod, + UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, + }, + ) + nc.nodeStore.Store, nc.nodeController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return nc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Node{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{}, + ) + return nc } // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { + go nc.nodeController.Run(util.NeverStop) + go nc.podController.Run(util.NeverStop) // Incorporate the results of node status pushed from kubelet to master. go util.Until(func() { if err := nc.monitorNodeStatus(); err != nil { @@ -176,6 +225,7 @@ func (nc *NodeController) Run(period time.Duration) { util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } + if remaining { nc.terminationEvictor.Add(value.Value) } @@ -238,6 +288,61 @@ func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api return nil } +// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating +// that should not be gracefully terminated. +func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { + pod, ok := obj.(*api.Pod) + if !ok { + return + } + + // consider only terminating pods + if pod.DeletionTimestamp == nil { + return + } + + // delete terminating pods that have not yet been scheduled + if len(pod.Spec.NodeName) == 0 { + nc.forcefullyDeletePod(pod) + return + } + + nodeObj, found, err := nc.nodeStore.GetByKey(pod.Spec.NodeName) + if err != nil { + // this can only happen if the Store.KeyFunc has a problem creating + // a key for the pod. If it happens once, it will happen again so + // don't bother requeuing the pod. + util.HandleError(err) + return + } + + // delete terminating pods that have been scheduled on + // nonexistant nodes + if !found { + nc.forcefullyDeletePod(pod) + return + } + + // delete terminating pods that have been scheduled on + // nodes that do not support graceful termination + // TODO(mikedanese): this can be removed when we no longer + // guarantee backwards compatibility of master API to kubelets with + // versions less than 1.1.0 + node := nodeObj.(*api.Node) + if strings.HasPrefix(node.Status.NodeInfo.KubeletVersion, "v1.0") { + nc.forcefullyDeletePod(pod) + return + } +} + +func forcefullyDeletePod(c client.Interface, pod *api.Pod) { + var zero int64 + err := c.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) + if err != nil { + util.HandleError(err) + } +} + // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, // post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or // not reachable for a long period of time. diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index b427aed70f6..f4f9eb03fe1 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -27,6 +27,7 @@ import ( apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/fields" @@ -646,6 +647,111 @@ func TestNodeDeletion(t *testing.T) { } } +func TestCheckPod(t *testing.T) { + + tcs := []struct { + pod api.Pod + prune bool + }{ + + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: "new"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: "old"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: ""}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: "nonexistant"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: "new"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: "old"}, + }, + prune: true, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: ""}, + }, + prune: true, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: "nonexistant"}, + }, + prune: true, + }, + } + + nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false) + nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) + nc.nodeStore.Store.Add(&api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "new", + }, + Status: api.NodeStatus{ + NodeInfo: api.NodeSystemInfo{ + KubeletVersion: "v1.1.0", + }, + }, + }) + nc.nodeStore.Store.Add(&api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "old", + }, + Status: api.NodeStatus{ + NodeInfo: api.NodeSystemInfo{ + KubeletVersion: "v1.0.0", + }, + }, + }) + + for i, tc := range tcs { + var deleteCalls int + nc.forcefullyDeletePod = func(_ *api.Pod) { + deleteCalls++ + } + + nc.maybeDeleteTerminatingPod(&tc.pod) + + if tc.prune && deleteCalls != 1 { + t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls) + } + if !tc.prune && deleteCalls != 0 { + t.Errorf("[%v] expected number of delete calls to be 0 but got %v", i, deleteCalls) + } + } +} + func newNode(name string) *api.Node { return &api.Node{ ObjectMeta: api.ObjectMeta{Name: name},