Merge pull request #32495 from gmarek/podgc

Automatic merge from submit-queue

Move orphaned Pod deletion logic to PodGC

cc @mwielgus @mikedanese @davidopp
This commit is contained in:
Kubernetes Submit Queue 2016-09-28 06:55:46 -07:00 committed by GitHub
commit 96a7b0920a
7 changed files with 213 additions and 201 deletions

View File

@ -222,8 +222,8 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.TerminatedPodGCThreshold > 0 { if s.TerminatedPodGCThreshold > 0 {
go podgc.New(client("pod-garbage-collector"), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)). go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(),
Run(wait.NeverStop) int(s.TerminatedPodGCThreshold)).Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }

View File

@ -145,10 +145,8 @@ func (s *CMServer) Run(_ []string) error {
go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC)). go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC)).
Run(int(s.ConcurrentRCSyncs), wait.NeverStop) Run(int(s.ConcurrentRCSyncs), wait.NeverStop)
if s.TerminatedPodGCThreshold > 0 { go podgc.NewFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), int(s.TerminatedPodGCThreshold)).
go podgc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), s.resyncPeriod, int(s.TerminatedPodGCThreshold)).
Run(wait.NeverStop) Run(wait.NeverStop)
}
//TODO(jdef) should eventually support more cloud providers here //TODO(jdef) should eventually support more cloud providers here
if s.CloudProvider != mesos.ProviderName { if s.CloudProvider != mesos.ProviderName {

View File

@ -40,22 +40,6 @@ const (
LargeClusterThreshold = 20 LargeClusterThreshold = 20
) )
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
continue
}
if err := forcefulDeletePodFunc(pod); err != nil {
utilruntime.HandleError(err)
}
}
}
// deletePods will delete all pods from master running on given node, and return true // deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion. // if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) { func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {

View File

@ -465,15 +465,6 @@ func (nc *NodeController) Run() {
}) })
} }
}, nodeEvictionPeriod, wait.NeverStop) }, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(func() {
pods, err := nc.podStore.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod)
}, 30*time.Second, wait.NeverStop)
} }
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
@ -512,7 +503,6 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range deleted { for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name) glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name)) recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
nc.evictPods(deleted[i])
delete(nc.knownNodeSet, deleted[i].Name) delete(nc.knownNodeSet, deleted[i].Name)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package node package node
import ( import (
"strings"
"testing" "testing"
"time" "time"
@ -1561,99 +1562,15 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
} }
} }
func TestNodeDeletion(t *testing.T) {
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated.
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
Spec: api.NodeSpec{
ExternalID: "node0",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated.
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
Spec: api.NodeSpec{
ExternalID: "node0",
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
fakeNodeHandler.Delete("node1", nil)
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
return true, 0
})
podEvicted := false
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podEvicted = true
}
}
if !podEvicted {
t.Error("expected pods to be evicted from the deleted node")
}
}
func TestNodeEventGeneration(t *testing.T) { func TestNodeEventGeneration(t *testing.T) {
fakeNow := unversioned.Date(2016, 8, 10, 12, 0, 0, 0, time.UTC) fakeNow := unversioned.Date(2016, 9, 10, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &FakeNodeHandler{ fakeNodeHandler := &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
UID: "1234567890", UID: "1234567890",
CreationTimestamp: unversioned.Date(2016, 8, 10, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
}, },
Spec: api.NodeSpec{ Spec: api.NodeSpec{
ExternalID: "node0", ExternalID: "node0",
@ -1662,16 +1579,11 @@ func TestNodeEventGeneration(t *testing.T) {
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionTrue, Status: api.ConditionUnknown,
// Node status has just been updated. LastHeartbeatTime: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
LastHeartbeatTime: fakeNow, LastTransitionTime: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
LastTransitionTime: fakeNow,
}, },
}, },
Capacity: api.ResourceList{
api.ResourceName(api.ResourceRequestsCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("20G"),
},
}, },
}, },
}, },
@ -1682,27 +1594,25 @@ func TestNodeEventGeneration(t *testing.T) {
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
}
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
fakeRecorder := NewFakeRecorder() fakeRecorder := NewFakeRecorder()
nodeController.recorder = fakeRecorder nodeController.recorder = fakeRecorder
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if len(fakeRecorder.events) != 2 {
fakeNodeHandler.Delete("node0", nil) t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.events), 2, fakeRecorder.events)
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
} }
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) { if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "DeletingNode" {
nodeUid, _ := value.UID.(string) var reasons []string
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) for _, event := range fakeRecorder.events {
return true, 0 reasons = append(reasons, event.Reason)
})
if len(fakeRecorder.events) != 3 {
t.Fatalf("unexpected events: %v", fakeRecorder.events)
} }
if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "RemovingNode" || fakeRecorder.events[2].Reason != "DeletingAllPods" { t.Fatalf("unexpected events generation: %v", strings.Join(reasons, ","))
t.Fatalf("unexpected events generation: %v", fakeRecorder.events)
} }
for _, event := range fakeRecorder.events { for _, event := range fakeRecorder.events {
involvedObject := event.InvolvedObject involvedObject := event.InvolvedObject
@ -1852,38 +1762,6 @@ func TestCheckPod(t *testing.T) {
} }
} }
func TestCleanupOrphanedPods(t *testing.T) {
pods := []*api.Pod{
newPod("a", "foo"),
newPod("b", "bar"),
newPod("c", "gone"),
}
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))
for _, pod := range pods {
p := pod
nc.podStore.Indexer.Add(&p)
}
var deleteCalls int
var deletedPodName string
forcefullyDeletePodFunc := func(p *api.Pod) error {
deleteCalls++
deletedPodName = p.ObjectMeta.Name
return nil
}
cleanupOrphanedPods(pods, nc.nodeStore.Store, forcefullyDeletePodFunc)
if deleteCalls != 1 {
t.Fatalf("expected one delete, got: %v", deleteCalls)
}
if deletedPodName != "c" {
t.Fatalf("expected deleted pod name to be 'c', but got: %q", deletedPodName)
}
}
func TestCheckNodeKubeletVersionParsing(t *testing.T) { func TestCheckNodeKubeletVersionParsing(t *testing.T) {
tests := []struct { tests := []struct {
version string version string

View File

@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
@ -42,60 +42,106 @@ const (
type PodGCController struct { type PodGCController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewPodGC(..., passing SharedInformer, ...), this
// will be null
internalPodInformer cache.SharedIndexInformer
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
podStoreSyncer *cache.Controller nodeStore cache.StoreToNodeLister
podController cache.ControllerInterface
nodeController cache.ControllerInterface
deletePod func(namespace, name string) error deletePod func(namespace, name string) error
threshold int terminatedPodThreshold int
} }
func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *PodGCController { func NewPodGC(kubeClient clientset.Interface, podInformer cache.SharedIndexInformer, terminatedPodThreshold int) *PodGCController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
} }
gcc := &PodGCController{ gcc := &PodGCController{
kubeClient: kubeClient, kubeClient: kubeClient,
threshold: threshold, terminatedPodThreshold: terminatedPodThreshold,
deletePod: func(namespace, name string) error { deletePod: func(namespace, name string) error {
return kubeClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0)) return kubeClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0))
}, },
} }
terminatedSelector := fields.ParseSelectorOrDie("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown)) gcc.podStore.Indexer = podInformer.GetIndexer()
gcc.podController = podInformer.GetController()
gcc.podStore.Indexer, gcc.podStoreSyncer = cache.NewIndexerInformer( gcc.nodeStore.Store, gcc.nodeController = cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = terminatedSelector return gcc.kubeClient.Core().Nodes().List(options)
return gcc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = terminatedSelector return gcc.kubeClient.Core().Nodes().Watch(options)
return gcc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
}, },
}, },
&api.Pod{}, &api.Node{},
resyncPeriod(), controller.NoResyncPeriodFunc(),
cache.ResourceEventHandlerFuncs{}, cache.ResourceEventHandlerFuncs{},
// We don't need to build a index for podStore here actually, but build one for consistency.
// It will ensure that if people start making use of the podStore in more specific ways,
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
) )
return gcc return gcc
} }
func NewFromClient(
kubeClient clientset.Interface,
terminatedPodThreshold int,
) *PodGCController {
podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
controller := NewPodGC(kubeClient, podInformer, terminatedPodThreshold)
controller.internalPodInformer = podInformer
return controller
}
func (gcc *PodGCController) Run(stop <-chan struct{}) { func (gcc *PodGCController) Run(stop <-chan struct{}) {
go gcc.podStoreSyncer.Run(stop) if gcc.internalPodInformer != nil {
go gcc.podController.Run(stop)
}
go gcc.nodeController.Run(stop)
go wait.Until(gcc.gc, gcCheckPeriod, stop) go wait.Until(gcc.gc, gcCheckPeriod, stop)
<-stop <-stop
} }
func (gcc *PodGCController) gc() { func (gcc *PodGCController) gc() {
terminatedPods, _ := gcc.podStore.List(labels.Everything()) pods, err := gcc.podStore.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing all Pods: %v", err)
return
}
if gcc.terminatedPodThreshold > 0 {
gcc.gcTerminated(pods)
}
gcc.gcOrphaned(pods)
}
func isPodTerminated(pod *api.Pod) bool {
if phase := pod.Status.Phase; phase != api.PodPending && phase != api.PodRunning && phase != api.PodUnknown {
return true
}
return false
}
func (gcc *PodGCController) gcTerminated(pods []*api.Pod) {
terminatedPods := []*api.Pod{}
for _, pod := range pods {
if isPodTerminated(pod) {
terminatedPods = append(terminatedPods, pod)
}
}
terminatedPodCount := len(terminatedPods) terminatedPodCount := len(terminatedPods)
sort.Sort(byCreationTimestamp(terminatedPods)) sort.Sort(byCreationTimestamp(terminatedPods))
deleteCount := terminatedPodCount - gcc.threshold deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
if deleteCount > terminatedPodCount { if deleteCount > terminatedPodCount {
deleteCount = terminatedPodCount deleteCount = terminatedPodCount
@ -118,6 +164,26 @@ func (gcc *PodGCController) gc() {
wait.Wait() wait.Wait()
} }
// cleanupOrphanedPods deletes pods that are bound to nodes that don't exist.
func (gcc *PodGCController) gcOrphaned(pods []*api.Pod) {
glog.V(4).Infof("GC'ing orphaned")
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := gcc.nodeStore.GetByKey(pod.Spec.NodeName); exists {
continue
}
glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName)
if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
utilruntime.HandleError(err)
} else {
glog.V(4).Infof("Forced deletion of oprhaned Pod %s succeeded", pod.Name)
}
}
}
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
type byCreationTimestamp []*api.Pod type byCreationTimestamp []*api.Pod

View File

@ -23,12 +23,12 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
func TestGC(t *testing.T) { func TestGCTerminated(t *testing.T) {
type nameToPhase struct { type nameToPhase struct {
name string name string
phase api.PodPhase phase api.PodPhase
@ -45,8 +45,27 @@ func TestGC(t *testing.T) {
{name: "b", phase: api.PodSucceeded}, {name: "b", phase: api.PodSucceeded},
}, },
threshold: 0, threshold: 0,
// threshold = 0 disables terminated pod deletion
deletedPodNames: sets.NewString(),
},
{
pods: []nameToPhase{
{name: "a", phase: api.PodFailed},
{name: "b", phase: api.PodSucceeded},
{name: "c", phase: api.PodFailed},
},
threshold: 1,
deletedPodNames: sets.NewString("a", "b"), deletedPodNames: sets.NewString("a", "b"),
}, },
{
pods: []nameToPhase{
{name: "a", phase: api.PodRunning},
{name: "b", phase: api.PodSucceeded},
{name: "c", phase: api.PodFailed},
},
threshold: 1,
deletedPodNames: sets.NewString("b"),
},
{ {
pods: []nameToPhase{ pods: []nameToPhase{
{name: "a", phase: api.PodFailed}, {name: "a", phase: api.PodFailed},
@ -67,7 +86,7 @@ func TestGC(t *testing.T) {
for i, test := range testCases { for i, test := range testCases {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
gcc := New(client, controller.NoResyncPeriodFunc, test.threshold) gcc := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0) deletedPodNames := make([]string, 0)
var lock sync.Mutex var lock sync.Mutex
gcc.deletePod = func(_, name string) error { gcc.deletePod = func(_, name string) error {
@ -83,9 +102,86 @@ func TestGC(t *testing.T) {
gcc.podStore.Indexer.Add(&api.Pod{ gcc.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}}, ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}},
Status: api.PodStatus{Phase: pod.phase}, Status: api.PodStatus{Phase: pod.phase},
Spec: api.PodSpec{NodeName: "node"},
}) })
} }
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{Name: "node"},
})
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.gc()
pass := true
for _, pod := range deletedPodNames {
if !test.deletedPodNames.Has(pod) {
pass = false
}
}
if len(deletedPodNames) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames)
}
}
}
func TestGCOrphaned(t *testing.T) {
type nameToPhase struct {
name string
phase api.PodPhase
}
testCases := []struct {
pods []nameToPhase
threshold int
deletedPodNames sets.String
}{
{
pods: []nameToPhase{
{name: "a", phase: api.PodFailed},
{name: "b", phase: api.PodSucceeded},
},
threshold: 0,
deletedPodNames: sets.NewString("a", "b"),
},
{
pods: []nameToPhase{
{name: "a", phase: api.PodRunning},
},
threshold: 1,
deletedPodNames: sets.NewString("a"),
},
}
for i, test := range testCases {
client := fake.NewSimpleClientset()
gcc := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
lock.Lock()
defer lock.Unlock()
deletedPodNames = append(deletedPodNames, name)
return nil
}
creationTime := time.Unix(0, 0)
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
gcc.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}},
Status: api.PodStatus{Phase: pod.phase},
Spec: api.PodSpec{NodeName: "node"},
})
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
gcc.gc() gcc.gc()
pass := true pass := true