Pod GC controller - use node lister

This commit is contained in:
Jacek Kaniuk 2019-10-23 16:54:54 +02:00
parent e6e026f1ad
commit 39883f08bf
4 changed files with 315 additions and 107 deletions

View File

@ -347,6 +347,7 @@ func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
go podgc.NewPodGC( go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold), int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(ctx.Stop) ).Run(ctx.Stop)
return nil, true, nil return nil, true, nil

View File

@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -19,6 +20,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library", "//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
@ -34,11 +36,14 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
], ],
) )

View File

@ -22,6 +22,7 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -31,41 +32,53 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog" "k8s.io/klog"
) )
const ( const (
// gcCheckPeriod defines frequency of running main controller loop
gcCheckPeriod = 20 * time.Second gcCheckPeriod = 20 * time.Second
// quarantineTime defines how long Orphaned GC waits for nodes to show up
// in an informer before issuing a GET call to check if they are truly gone
quarantineTime = 40 * time.Second
) )
type PodGCController struct { type PodGCController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podLister corelisters.PodLister podLister corelisters.PodLister
podListerSynced cache.InformerSynced podListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
nodeQueue workqueue.DelayingInterface
deletePod func(namespace, name string) error deletePod func(namespace, name string) error
terminatedPodThreshold int terminatedPodThreshold int
} }
func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer, terminatedPodThreshold int) *PodGCController { func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int) *PodGCController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) ratelimiter.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
} }
gcc := &PodGCController{ gcc := &PodGCController{
kubeClient: kubeClient, kubeClient: kubeClient,
terminatedPodThreshold: terminatedPodThreshold, terminatedPodThreshold: terminatedPodThreshold,
podLister: podInformer.Lister(),
podListerSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
nodeQueue: workqueue.NewNamedDelayingQueue("orphaned_pods_nodes"),
deletePod: func(namespace, name string) error { deletePod: func(namespace, name string) error {
klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name) klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name)
return kubeClient.CoreV1().Pods(namespace).Delete(name, metav1.NewDeleteOptions(0)) return kubeClient.CoreV1().Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
}, },
} }
gcc.podLister = podInformer.Lister()
gcc.podListerSynced = podInformer.Informer().HasSynced
return gcc return gcc
} }
@ -73,9 +86,10 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
klog.Infof("Starting GC controller") klog.Infof("Starting GC controller")
defer gcc.nodeQueue.ShutDown()
defer klog.Infof("Shutting down GC controller") defer klog.Infof("Shutting down GC controller")
if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced) { if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced, gcc.nodeListerSynced) {
return return
} }
@ -87,13 +101,18 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
func (gcc *PodGCController) gc() { func (gcc *PodGCController) gc() {
pods, err := gcc.podLister.List(labels.Everything()) pods, err := gcc.podLister.List(labels.Everything())
if err != nil { if err != nil {
klog.Errorf("Error while listing all Pods: %v", err) klog.Errorf("Error while listing all pods: %v", err)
return
}
nodes, err := gcc.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error while listing all nodes: %v", err)
return return
} }
if gcc.terminatedPodThreshold > 0 { if gcc.terminatedPodThreshold > 0 {
gcc.gcTerminated(pods) gcc.gcTerminated(pods)
} }
gcc.gcOrphaned(pods) gcc.gcOrphaned(pods, nodes)
gcc.gcUnscheduledTerminating(pods) gcc.gcUnscheduledTerminating(pods)
} }
@ -118,12 +137,11 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
if deleteCount > terminatedPodCount { if deleteCount > terminatedPodCount {
deleteCount = terminatedPodCount deleteCount = terminatedPodCount
} }
if deleteCount > 0 { if deleteCount <= 0 {
klog.Infof("garbage collecting %v pods", deleteCount)
} else {
return return
} }
klog.Infof("garbage collecting %v pods", deleteCount)
// sort only when necessary // sort only when necessary
sort.Sort(byCreationTimestamp(terminatedPods)) sort.Sort(byCreationTimestamp(terminatedPods))
var wait sync.WaitGroup var wait sync.WaitGroup
@ -141,23 +159,26 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
} }
// gcOrphaned deletes pods that are bound to nodes that don't exist. // gcOrphaned deletes pods that are bound to nodes that don't exist.
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) { func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod, nodes []*v1.Node) {
klog.V(4).Infof("GC'ing orphaned") klog.V(4).Infof("GC'ing orphaned")
// We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible. existingNodeNames := sets.NewString()
nodes, err := gcc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) for _, node := range nodes {
if err != nil { existingNodeNames.Insert(node.Name)
}
// Add newly found unknown nodes to quarantine
for _, pod := range pods {
if pod.Spec.NodeName != "" && !existingNodeNames.Has(pod.Spec.NodeName) {
gcc.nodeQueue.AddAfter(pod.Spec.NodeName, quarantineTime)
}
}
// Check if nodes are still missing after quarantine period
deletedNodesNames, quit := gcc.discoverDeletedNodes(existingNodeNames)
if quit {
return return
} }
nodeNames := sets.NewString() // Delete orphaned pods
for i := range nodes.Items {
nodeNames.Insert(nodes.Items[i].Name)
}
for _, pod := range pods { for _, pod := range pods {
if pod.Spec.NodeName == "" { if !deletedNodesNames.Has(pod.Spec.NodeName) {
continue
}
if nodeNames.Has(pod.Spec.NodeName) {
continue continue
} }
klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName) klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName)
@ -169,6 +190,37 @@ func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
} }
} }
func (gcc *PodGCController) discoverDeletedNodes(existingNodeNames sets.String) (sets.String, bool) {
deletedNodesNames := sets.NewString()
for gcc.nodeQueue.Len() > 0 {
item, quit := gcc.nodeQueue.Get()
if quit {
return nil, true
}
nodeName := item.(string)
if !existingNodeNames.Has(nodeName) {
exists, err := gcc.checkIfNodeExists(nodeName)
switch {
case err != nil:
klog.Errorf("Error while getting node %q: %v", nodeName, err)
// Node will be added back to the queue in the subsequent loop if still needed
case !exists:
deletedNodesNames.Insert(nodeName)
}
}
gcc.nodeQueue.Done(item)
}
return deletedNodesNames, false
}
func (gcc *PodGCController) checkIfNodeExists(name string) (bool, error) {
_, fetchErr := gcc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if errors.IsNotFound(fetchErr) {
return false, nil
}
return fetchErr == nil, fetchErr
}
// gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node. // gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node.
func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) { func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
klog.V(4).Infof("GC'ing unscheduled pods which are terminating.") klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")

View File

@ -21,14 +21,17 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
) )
@ -47,12 +50,25 @@ func (*FakeController) LastSyncResourceVersion() string {
func alwaysReady() bool { return true } func alwaysReady() bool { return true }
func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer) { func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer, coreinformers.NodeInformer) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods() podInformer := informerFactory.Core().V1().Pods()
controller := NewPodGC(kubeClient, podInformer, terminatedPodThreshold) nodeInformer := informerFactory.Core().V1().Nodes()
controller := NewPodGC(kubeClient, podInformer, nodeInformer, terminatedPodThreshold)
controller.podListerSynced = alwaysReady controller.podListerSynced = alwaysReady
return controller, podInformer return controller, podInformer, nodeInformer
}
func compareStringSetToList(set sets.String, list []string) bool {
for _, item := range list {
if !set.Has(item) {
return false
}
}
if len(list) != len(set) {
return false
}
return true
} }
func TestGCTerminated(t *testing.T) { func TestGCTerminated(t *testing.T) {
@ -113,7 +129,7 @@ func TestGCTerminated(t *testing.T) {
for i, test := range testCases { for i, test := range testCases {
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}}) client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}})
gcc, podInformer := NewFromClient(client, test.threshold) gcc, podInformer, _ := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0) deletedPodNames := make([]string, 0)
var lock sync.Mutex var lock sync.Mutex
gcc.deletePod = func(_, name string) error { gcc.deletePod = func(_, name string) error {
@ -135,90 +151,232 @@ func TestGCTerminated(t *testing.T) {
gcc.gc() gcc.gc()
pass := true if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
for _, pod := range deletedPodNames { t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
if !test.deletedPodNames.Has(pod) { i, test.deletedPodNames.List(), deletedPodNames)
pass = false
}
}
if len(deletedPodNames) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames)
} }
} }
} }
func TestGCOrphaned(t *testing.T) { func makePod(name string, nodeName string, phase v1.PodPhase) *v1.Pod {
type nameToPhase struct { return &v1.Pod{
name string ObjectMeta: metav1.ObjectMeta{
phase v1.PodPhase Name: name,
},
Spec: v1.PodSpec{NodeName: nodeName},
Status: v1.PodStatus{Phase: phase},
} }
}
func waitForAdded(q workqueue.DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
}
func TestGCOrphaned(t *testing.T) {
testCases := []struct { testCases := []struct {
pods []nameToPhase name string
threshold int initialClientNodes []*v1.Node
deletedPodNames sets.String initialInformerNodes []*v1.Node
delay time.Duration
addedClientNodes []*v1.Node
deletedClientNodes []*v1.Node
addedInformerNodes []*v1.Node
deletedInformerNodes []*v1.Node
pods []*v1.Pod
itemsInQueue int
deletedPodNames sets.String
}{ }{
{ {
pods: []nameToPhase{ name: "nodes present in lister",
{name: "a", phase: v1.PodFailed}, initialInformerNodes: []*v1.Node{
{name: "b", phase: v1.PodSucceeded}, testutil.NewNode("existing1"),
testutil.NewNode("existing2"),
}, },
threshold: 0, delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "existing1", v1.PodRunning),
makePod("b", "existing2", v1.PodFailed),
makePod("c", "existing2", v1.PodSucceeded),
},
itemsInQueue: 0,
deletedPodNames: sets.NewString(),
},
{
name: "nodes present in client",
initialClientNodes: []*v1.Node{
testutil.NewNode("existing1"),
testutil.NewNode("existing2"),
},
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "existing1", v1.PodRunning),
makePod("b", "existing2", v1.PodFailed),
makePod("c", "existing2", v1.PodSucceeded),
},
itemsInQueue: 2,
deletedPodNames: sets.NewString(),
},
{
name: "no nodes",
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
makePod("b", "deleted", v1.PodSucceeded),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a", "b"), deletedPodNames: sets.NewString("a", "b"),
}, },
{ {
pods: []nameToPhase{ name: "quarantine not finished",
{name: "a", phase: v1.PodRunning}, delay: quarantineTime / 2,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
}, },
threshold: 1, itemsInQueue: 0,
deletedPodNames: sets.NewString(),
},
{
name: "wrong nodes",
initialInformerNodes: []*v1.Node{testutil.NewNode("existing")},
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodRunning),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a"), deletedPodNames: sets.NewString("a"),
}, },
{
name: "some nodes missing",
initialInformerNodes: []*v1.Node{testutil.NewNode("existing")},
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
makePod("b", "existing", v1.PodFailed),
makePod("c", "deleted", v1.PodSucceeded),
makePod("d", "deleted", v1.PodRunning),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a", "c", "d"),
},
{
name: "node added to client after quarantine",
delay: 2 * quarantineTime,
addedClientNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodRunning),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString(),
},
{
name: "node added to informer after quarantine",
delay: 2 * quarantineTime,
addedInformerNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodFailed),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString(),
},
{
// It shouldn't happen that client will be lagging behind informer.
// This test case is more a sanity check.
name: "node deleted from client after quarantine",
initialClientNodes: []*v1.Node{testutil.NewNode("node")},
delay: 2 * quarantineTime,
deletedClientNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodFailed),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a"),
},
{
name: "node deleted from informer after quarantine",
initialInformerNodes: []*v1.Node{testutil.NewNode("node")},
delay: 2 * quarantineTime,
deletedInformerNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodSucceeded),
},
itemsInQueue: 0,
deletedPodNames: sets.NewString(),
},
} }
for i, test := range testCases { for _, test := range testCases {
client := fake.NewSimpleClientset() t.Run(test.name, func(t *testing.T) {
gcc, podInformer := NewFromClient(client, test.threshold) nodeList := &v1.NodeList{}
deletedPodNames := make([]string, 0) for _, node := range test.initialClientNodes {
var lock sync.Mutex nodeList.Items = append(nodeList.Items, *node)
gcc.deletePod = func(_, name string) error {
lock.Lock()
defer lock.Unlock()
deletedPodNames = append(deletedPodNames, name)
return nil
}
creationTime := time.Unix(0, 0)
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
podInformer.Informer().GetStore().Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}},
Status: v1.PodStatus{Phase: pod.phase},
Spec: v1.PodSpec{NodeName: "node"},
})
}
pods, err := podInformer.Lister().List(labels.Everything())
if err != nil {
t.Errorf("Error while listing all Pods: %v", err)
return
}
gcc.gcOrphaned(pods)
pass := true
for _, pod := range deletedPodNames {
if !test.deletedPodNames.Has(pod) {
pass = false
} }
} client := fake.NewSimpleClientset(nodeList)
if len(deletedPodNames) != len(test.deletedPodNames) { gcc, podInformer, nodeInformer := NewFromClient(client, -1)
pass = false for _, node := range test.initialInformerNodes {
} nodeInformer.Informer().GetStore().Add(node)
if !pass { }
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames) for _, pod := range test.pods {
} podInformer.Informer().GetStore().Add(pod)
}
// Overwrite queue
fakeClock := clock.NewFakeClock(time.Now())
gcc.nodeQueue.ShutDown()
gcc.nodeQueue = workqueue.NewDelayingQueueWithCustomClock(fakeClock, "podgc_test_queue")
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
lock.Lock()
defer lock.Unlock()
deletedPodNames = append(deletedPodNames, name)
return nil
}
// First GC of orphaned pods
gcc.gc()
if len(deletedPodNames) > 0 {
t.Errorf("no pods should be deleted at this point.\n\tactual: %v", deletedPodNames)
}
// Move clock forward
fakeClock.Step(test.delay)
// Wait for queue goroutine to process items
if test.itemsInQueue > 0 {
err := waitForAdded(gcc.nodeQueue, test.itemsInQueue)
if err != nil {
t.Errorf("wrong number of items in the node queue.\n\texpected: %v\n\tactual: %v",
test.itemsInQueue, gcc.nodeQueue.Len())
}
}
// Execute planned nodes changes
for _, node := range test.addedClientNodes {
client.CoreV1().Nodes().Create(node)
}
for _, node := range test.deletedClientNodes {
client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{})
}
for _, node := range test.addedInformerNodes {
nodeInformer.Informer().GetStore().Add(node)
}
for _, node := range test.deletedInformerNodes {
nodeInformer.Informer().GetStore().Delete(node)
}
// Actual pod deletion
gcc.gc()
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
test.deletedPodNames.List(), deletedPodNames)
}
})
} }
} }
@ -257,7 +415,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
for i, test := range testCases { for i, test := range testCases {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
gcc, podInformer := NewFromClient(client, -1) gcc, podInformer, _ := NewFromClient(client, -1)
deletedPodNames := make([]string, 0) deletedPodNames := make([]string, 0)
var lock sync.Mutex var lock sync.Mutex
gcc.deletePod = func(_, name string) error { gcc.deletePod = func(_, name string) error {
@ -285,17 +443,9 @@ func TestGCUnscheduledTerminating(t *testing.T) {
} }
gcc.gcUnscheduledTerminating(pods) gcc.gcUnscheduledTerminating(pods)
pass := true if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
for _, pod := range deletedPodNames { t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v",
if !test.deletedPodNames.Has(pod) { i, test.deletedPodNames.List(), deletedPodNames, test.name)
pass = false
}
}
if len(deletedPodNames) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v", i, test.deletedPodNames, deletedPodNames, test.name)
} }
} }
} }