gcOrphaned check via the API that the node doesn’t exist

This commit is contained in:
gmarek 2016-11-23 10:33:52 +01:00
parent 8e8599fcd7
commit 15f2dbe13c
3 changed files with 18 additions and 41 deletions

View File

@ -24,11 +24,10 @@ go_library(
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library", "//pkg/controller/informers:go_default_library",
"//pkg/labels:go_default_library", "//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/metrics:go_default_library", "//pkg/util/metrics:go_default_library",
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library", "//pkg/util/wait:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
], ],
) )
@ -41,8 +40,8 @@ go_test(
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library", "//pkg/apis/meta/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library", "//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
"//pkg/controller/node/testutil:go_default_library",
"//pkg/labels:go_default_library", "//pkg/labels:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",
], ],

View File

@ -27,11 +27,10 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -50,11 +49,8 @@ type PodGCController struct {
// will be null // will be null
internalPodInformer cache.SharedIndexInformer internalPodInformer cache.SharedIndexInformer
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
nodeStore cache.StoreToNodeLister podController cache.ControllerInterface
podController cache.ControllerInterface
nodeController cache.ControllerInterface
deletePod func(namespace, name string) error deletePod func(namespace, name string) error
terminatedPodThreshold int terminatedPodThreshold int
@ -76,20 +72,6 @@ func NewPodGC(kubeClient clientset.Interface, podInformer cache.SharedIndexInfor
gcc.podStore.Indexer = podInformer.GetIndexer() gcc.podStore.Indexer = podInformer.GetIndexer()
gcc.podController = podInformer.GetController() gcc.podController = podInformer.GetController()
gcc.nodeStore.Store, gcc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return gcc.kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return gcc.kubeClient.Core().Nodes().Watch(options)
},
},
&v1.Node{},
controller.NoResyncPeriodFunc(),
cache.ResourceEventHandlerFuncs{},
)
return gcc return gcc
} }
@ -107,13 +89,12 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
if gcc.internalPodInformer != nil { if gcc.internalPodInformer != nil {
go gcc.podController.Run(stop) go gcc.podController.Run(stop)
} }
go gcc.nodeController.Run(stop)
go wait.Until(gcc.gc, gcCheckPeriod, stop) go wait.Until(gcc.gc, gcCheckPeriod, stop)
<-stop <-stop
} }
func (gcc *PodGCController) gc() { func (gcc *PodGCController) gc() {
if !gcc.podController.HasSynced() || !gcc.nodeController.HasSynced() { if !gcc.podController.HasSynced() {
glog.V(2).Infof("PodGCController is waiting for informer sync...") glog.V(2).Infof("PodGCController is waiting for informer sync...")
return return
} }
@ -173,12 +154,21 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
// gcOrphaned deletes pods that are bound to nodes that don't exist. // gcOrphaned deletes pods that are bound to nodes that don't exist.
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) { func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
glog.V(4).Infof("GC'ing orphaned") glog.V(4).Infof("GC'ing orphaned")
// We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible.
nodes, err := gcc.kubeClient.Core().Nodes().List(v1.ListOptions{})
if err != nil {
return
}
nodeNames := sets.NewString()
for i := range nodes.Items {
nodeNames.Insert(nodes.Items[i].Name)
}
for _, pod := range pods { for _, pod := range pods {
if pod.Spec.NodeName == "" { if pod.Spec.NodeName == "" {
continue continue
} }
if _, exists, _ := gcc.nodeStore.GetByKey(pod.Spec.NodeName); exists { if nodeNames.Has(pod.Spec.NodeName) {
continue continue
} }
glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName) glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName)

View File

@ -23,8 +23,8 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/controller/node/testutil"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -94,7 +94,7 @@ func TestGCTerminated(t *testing.T) {
} }
for i, test := range testCases { for i, test := range testCases {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}})
gcc := NewFromClient(client, test.threshold) gcc := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0) deletedPodNames := make([]string, 0)
var lock sync.Mutex var lock sync.Mutex
@ -115,13 +115,7 @@ func TestGCTerminated(t *testing.T) {
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(&v1.Node{
ObjectMeta: v1.ObjectMeta{Name: "node"},
})
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.podController = &FakeController{} gcc.podController = &FakeController{}
gcc.nodeController = &FakeController{}
gcc.gc() gcc.gc()
@ -190,10 +184,7 @@ func TestGCOrphaned(t *testing.T) {
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.podController = &FakeController{} gcc.podController = &FakeController{}
gcc.nodeController = &FakeController{}
pods, err := gcc.podStore.List(labels.Everything()) pods, err := gcc.podStore.List(labels.Everything())
if err != nil { if err != nil {
@ -273,10 +264,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.podController = &FakeController{} gcc.podController = &FakeController{}
gcc.nodeController = &FakeController{}
pods, err := gcc.podStore.List(labels.Everything()) pods, err := gcc.podStore.List(labels.Everything())
if err != nil { if err != nil {