Merge pull request #15930 from mikedanese/nc

make nodecontroller delete terminating pods on 1.0 nodes
This commit is contained in:
Saad Ali 2015-10-22 12:38:14 -07:00
commit f960b05fe1
2 changed files with 212 additions and 1 deletions

View File

@ -20,20 +20,26 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
)
var (
@ -98,6 +104,14 @@ type NodeController struct {
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
// Pod framework and store
podController *framework.Controller
podStore cache.StoreToPodLister
// Node framework and store
nodeController *framework.Controller
nodeStore cache.StoreToNodeLister
forcefullyDeletePod func(*api.Pod)
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -125,7 +139,8 @@ func NewNodeController(
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
evictorLock := sync.Mutex{}
return &NodeController{
nc := &NodeController{
cloud: cloud,
knownNodeSet: make(sets.String),
kubeClient: kubeClient,
@ -143,11 +158,45 @@ func NewNodeController(
now: unversioned.Now,
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) { forcefullyDeletePod(kubeClient, p) },
}
nc.podStore.Store, nc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return nc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
},
)
nc.nodeStore.Store, nc.nodeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return nc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Node{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{},
)
return nc
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
go nc.nodeController.Run(util.NeverStop)
go nc.podController.Run(util.NeverStop)
// Incorporate the results of node status pushed from kubelet to master.
go util.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
@ -176,6 +225,7 @@ func (nc *NodeController) Run(period time.Duration) {
util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
nc.terminationEvictor.Add(value.Value)
}
@ -238,6 +288,61 @@ func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api
return nil
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
// delete terminating pods that have not yet been scheduled
if len(pod.Spec.NodeName) == 0 {
nc.forcefullyDeletePod(pod)
return
}
nodeObj, found, err := nc.nodeStore.GetByKey(pod.Spec.NodeName)
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
util.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nonexistant nodes
if !found {
nc.forcefullyDeletePod(pod)
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
node := nodeObj.(*api.Node)
if strings.HasPrefix(node.Status.NodeInfo.KubeletVersion, "v1.0") {
nc.forcefullyDeletePod(pod)
return
}
}
func forcefullyDeletePod(c client.Interface, pod *api.Pod) {
var zero int64
err := c.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err != nil {
util.HandleError(err)
}
}
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.

View File

@ -27,6 +27,7 @@ import (
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/fields"
@ -646,6 +647,111 @@ func TestNodeDeletion(t *testing.T) {
}
}
func TestCheckPod(t *testing.T) {
tcs := []struct {
pod api.Pod
prune bool
}{
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil},
Spec: api.PodSpec{NodeName: "new"},
},
prune: false,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil},
Spec: api.PodSpec{NodeName: "old"},
},
prune: false,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil},
Spec: api.PodSpec{NodeName: ""},
},
prune: false,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil},
Spec: api.PodSpec{NodeName: "nonexistant"},
},
prune: false,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}},
Spec: api.PodSpec{NodeName: "new"},
},
prune: false,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}},
Spec: api.PodSpec{NodeName: "old"},
},
prune: true,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}},
Spec: api.PodSpec{NodeName: ""},
},
prune: true,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}},
Spec: api.PodSpec{NodeName: "nonexistant"},
},
prune: true,
},
}
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Name: "new",
},
Status: api.NodeStatus{
NodeInfo: api.NodeSystemInfo{
KubeletVersion: "v1.1.0",
},
},
})
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Name: "old",
},
Status: api.NodeStatus{
NodeInfo: api.NodeSystemInfo{
KubeletVersion: "v1.0.0",
},
},
})
for i, tc := range tcs {
var deleteCalls int
nc.forcefullyDeletePod = func(_ *api.Pod) {
deleteCalls++
}
nc.maybeDeleteTerminatingPod(&tc.pod)
if tc.prune && deleteCalls != 1 {
t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls)
}
if !tc.prune && deleteCalls != 0 {
t.Errorf("[%v] expected number of delete calls to be 0 but got %v", i, deleteCalls)
}
}
}
func newNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},