Merge pull request #37365 from gmarek/nodecontroller

Automatic merge from submit-queue

gcOrphaned check via the API that the node doesn’t exist

It's needed to make sure we don't make invalid decisions when system is overloaded and cache is not keeping up.

@wojtek-t - this adds one `Node.List()` per 20 sec. Listing Nodes is an expensive operation, so I'd like you to chime in.
This commit is contained in:
Kubernetes Submit Queue 2016-12-06 04:49:03 -08:00 committed by GitHub
commit 2c63b6f5ca
3 changed files with 18 additions and 41 deletions

View File

@ -24,11 +24,10 @@ go_library(
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library", "//pkg/controller/informers:go_default_library",
"//pkg/labels:go_default_library", "//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/metrics:go_default_library", "//pkg/util/metrics:go_default_library",
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library", "//pkg/util/wait:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
], ],
) )
@ -41,8 +40,8 @@ go_test(
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library", "//pkg/apis/meta/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library", "//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
"//pkg/controller/node/testutil:go_default_library",
"//pkg/labels:go_default_library", "//pkg/labels:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",
], ],

View File

@ -27,11 +27,10 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -50,11 +49,8 @@ type PodGCController struct {
// will be null // will be null
internalPodInformer cache.SharedIndexInformer internalPodInformer cache.SharedIndexInformer
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
nodeStore cache.StoreToNodeLister podController cache.ControllerInterface
podController cache.ControllerInterface
nodeController cache.ControllerInterface
deletePod func(namespace, name string) error deletePod func(namespace, name string) error
terminatedPodThreshold int terminatedPodThreshold int
@ -76,20 +72,6 @@ func NewPodGC(kubeClient clientset.Interface, podInformer cache.SharedIndexInfor
gcc.podStore.Indexer = podInformer.GetIndexer() gcc.podStore.Indexer = podInformer.GetIndexer()
gcc.podController = podInformer.GetController() gcc.podController = podInformer.GetController()
gcc.nodeStore.Store, gcc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return gcc.kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return gcc.kubeClient.Core().Nodes().Watch(options)
},
},
&v1.Node{},
controller.NoResyncPeriodFunc(),
cache.ResourceEventHandlerFuncs{},
)
return gcc return gcc
} }
@ -107,13 +89,12 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
if gcc.internalPodInformer != nil { if gcc.internalPodInformer != nil {
go gcc.podController.Run(stop) go gcc.podController.Run(stop)
} }
go gcc.nodeController.Run(stop)
go wait.Until(gcc.gc, gcCheckPeriod, stop) go wait.Until(gcc.gc, gcCheckPeriod, stop)
<-stop <-stop
} }
func (gcc *PodGCController) gc() { func (gcc *PodGCController) gc() {
if !gcc.podController.HasSynced() || !gcc.nodeController.HasSynced() { if !gcc.podController.HasSynced() {
glog.V(2).Infof("PodGCController is waiting for informer sync...") glog.V(2).Infof("PodGCController is waiting for informer sync...")
return return
} }
@ -173,12 +154,21 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
// gcOrphaned deletes pods that are bound to nodes that don't exist. // gcOrphaned deletes pods that are bound to nodes that don't exist.
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) { func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
glog.V(4).Infof("GC'ing orphaned") glog.V(4).Infof("GC'ing orphaned")
// We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible.
nodes, err := gcc.kubeClient.Core().Nodes().List(v1.ListOptions{})
if err != nil {
return
}
nodeNames := sets.NewString()
for i := range nodes.Items {
nodeNames.Insert(nodes.Items[i].Name)
}
for _, pod := range pods { for _, pod := range pods {
if pod.Spec.NodeName == "" { if pod.Spec.NodeName == "" {
continue continue
} }
if _, exists, _ := gcc.nodeStore.GetByKey(pod.Spec.NodeName); exists { if nodeNames.Has(pod.Spec.NodeName) {
continue continue
} }
glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName) glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName)

View File

@ -23,8 +23,8 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/controller/node/testutil"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -94,7 +94,7 @@ func TestGCTerminated(t *testing.T) {
} }
for i, test := range testCases { for i, test := range testCases {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}})
gcc := NewFromClient(client, test.threshold) gcc := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0) deletedPodNames := make([]string, 0)
var lock sync.Mutex var lock sync.Mutex
@ -115,13 +115,7 @@ func TestGCTerminated(t *testing.T) {
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(&v1.Node{
ObjectMeta: v1.ObjectMeta{Name: "node"},
})
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.podController = &FakeController{} gcc.podController = &FakeController{}
gcc.nodeController = &FakeController{}
gcc.gc() gcc.gc()
@ -190,10 +184,7 @@ func TestGCOrphaned(t *testing.T) {
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.podController = &FakeController{} gcc.podController = &FakeController{}
gcc.nodeController = &FakeController{}
pods, err := gcc.podStore.List(labels.Everything()) pods, err := gcc.podStore.List(labels.Everything())
if err != nil { if err != nil {
@ -273,10 +264,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.podController = &FakeController{} gcc.podController = &FakeController{}
gcc.nodeController = &FakeController{}
pods, err := gcc.podStore.List(labels.Everything()) pods, err := gcc.podStore.List(labels.Everything())
if err != nil { if err != nil {