mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Move orphaned Pod deletion logic to PodGC
This commit is contained in:
parent
dc06ceb87d
commit
cb0a13c1e5
@ -222,8 +222,8 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <
|
|||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
if s.TerminatedPodGCThreshold > 0 {
|
if s.TerminatedPodGCThreshold > 0 {
|
||||||
go podgc.New(client("pod-garbage-collector"), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)).
|
go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(),
|
||||||
Run(wait.NeverStop)
|
int(s.TerminatedPodGCThreshold)).Run(wait.NeverStop)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,10 +145,8 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC)).
|
go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC)).
|
||||||
Run(int(s.ConcurrentRCSyncs), wait.NeverStop)
|
Run(int(s.ConcurrentRCSyncs), wait.NeverStop)
|
||||||
|
|
||||||
if s.TerminatedPodGCThreshold > 0 {
|
go podgc.NewFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), int(s.TerminatedPodGCThreshold)).
|
||||||
go podgc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), s.resyncPeriod, int(s.TerminatedPodGCThreshold)).
|
|
||||||
Run(wait.NeverStop)
|
Run(wait.NeverStop)
|
||||||
}
|
|
||||||
|
|
||||||
//TODO(jdef) should eventually support more cloud providers here
|
//TODO(jdef) should eventually support more cloud providers here
|
||||||
if s.CloudProvider != mesos.ProviderName {
|
if s.CloudProvider != mesos.ProviderName {
|
||||||
|
@ -40,22 +40,6 @@ const (
|
|||||||
LargeClusterThreshold = 20
|
LargeClusterThreshold = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
|
|
||||||
// exist.
|
|
||||||
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
|
|
||||||
for _, pod := range pods {
|
|
||||||
if pod.Spec.NodeName == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := forcefulDeletePodFunc(pod); err != nil {
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deletePods will delete all pods from master running on given node, and return true
|
// deletePods will delete all pods from master running on given node, and return true
|
||||||
// if any pods were deleted, or were found pending deletion.
|
// if any pods were deleted, or were found pending deletion.
|
||||||
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
|
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
|
||||||
|
@ -465,15 +465,6 @@ func (nc *NodeController) Run() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}, nodeEvictionPeriod, wait.NeverStop)
|
}, nodeEvictionPeriod, wait.NeverStop)
|
||||||
|
|
||||||
go wait.Until(func() {
|
|
||||||
pods, err := nc.podStore.List(labels.Everything())
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod)
|
|
||||||
}, 30*time.Second, wait.NeverStop)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||||
@ -512,7 +503,6 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
for i := range deleted {
|
for i := range deleted {
|
||||||
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
|
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
|
||||||
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
|
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
|
||||||
nc.evictPods(deleted[i])
|
|
||||||
delete(nc.knownNodeSet, deleted[i].Name)
|
delete(nc.knownNodeSet, deleted[i].Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package node
|
package node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -1561,99 +1562,15 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNodeDeletion(t *testing.T) {
|
|
||||||
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
|
||||||
Existing: []*api.Node{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "node0",
|
|
||||||
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
||||||
},
|
|
||||||
Status: api.NodeStatus{
|
|
||||||
Conditions: []api.NodeCondition{
|
|
||||||
{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionTrue,
|
|
||||||
// Node status has just been updated.
|
|
||||||
LastHeartbeatTime: fakeNow,
|
|
||||||
LastTransitionTime: fakeNow,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Capacity: api.ResourceList{
|
|
||||||
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
|
|
||||||
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: api.NodeSpec{
|
|
||||||
ExternalID: "node0",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "node1",
|
|
||||||
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
||||||
},
|
|
||||||
Status: api.NodeStatus{
|
|
||||||
Conditions: []api.NodeCondition{
|
|
||||||
{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionTrue,
|
|
||||||
// Node status has just been updated.
|
|
||||||
LastHeartbeatTime: fakeNow,
|
|
||||||
LastTransitionTime: fakeNow,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Capacity: api.ResourceList{
|
|
||||||
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
|
|
||||||
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: api.NodeSpec{
|
|
||||||
ExternalID: "node0",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
|
|
||||||
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
|
|
||||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
|
|
||||||
testNodeMonitorPeriod, nil, nil, 0, false)
|
|
||||||
nodeController.now = func() unversioned.Time { return fakeNow }
|
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
fakeNodeHandler.Delete("node1", nil)
|
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
|
|
||||||
uid, _ := value.UID.(string)
|
|
||||||
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
|
|
||||||
return true, 0
|
|
||||||
})
|
|
||||||
podEvicted := false
|
|
||||||
for _, action := range fakeNodeHandler.Actions() {
|
|
||||||
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
|
|
||||||
podEvicted = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !podEvicted {
|
|
||||||
t.Error("expected pods to be evicted from the deleted node")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNodeEventGeneration(t *testing.T) {
|
func TestNodeEventGeneration(t *testing.T) {
|
||||||
fakeNow := unversioned.Date(2016, 8, 10, 12, 0, 0, 0, time.UTC)
|
fakeNow := unversioned.Date(2016, 9, 10, 12, 0, 0, 0, time.UTC)
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
fakeNodeHandler := &FakeNodeHandler{
|
||||||
Existing: []*api.Node{
|
Existing: []*api.Node{
|
||||||
{
|
{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "node0",
|
Name: "node0",
|
||||||
UID: "1234567890",
|
UID: "1234567890",
|
||||||
CreationTimestamp: unversioned.Date(2016, 8, 10, 0, 0, 0, 0, time.UTC),
|
CreationTimestamp: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
|
||||||
},
|
},
|
||||||
Spec: api.NodeSpec{
|
Spec: api.NodeSpec{
|
||||||
ExternalID: "node0",
|
ExternalID: "node0",
|
||||||
@ -1662,16 +1579,11 @@ func TestNodeEventGeneration(t *testing.T) {
|
|||||||
Conditions: []api.NodeCondition{
|
Conditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionTrue,
|
Status: api.ConditionUnknown,
|
||||||
// Node status has just been updated.
|
LastHeartbeatTime: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
|
||||||
LastHeartbeatTime: fakeNow,
|
LastTransitionTime: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
|
||||||
LastTransitionTime: fakeNow,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Capacity: api.ResourceList{
|
|
||||||
api.ResourceName(api.ResourceRequestsCPU): resource.MustParse("10"),
|
|
||||||
api.ResourceName(api.ResourceMemory): resource.MustParse("20G"),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1682,27 +1594,25 @@ func TestNodeEventGeneration(t *testing.T) {
|
|||||||
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
|
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
|
||||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
|
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
|
||||||
testNodeMonitorPeriod, nil, nil, 0, false)
|
testNodeMonitorPeriod, nil, nil, 0, false)
|
||||||
|
nodeController.cloud = &fakecloud.FakeCloud{}
|
||||||
|
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
nodeController.now = func() unversioned.Time { return fakeNow }
|
nodeController.now = func() unversioned.Time { return fakeNow }
|
||||||
fakeRecorder := NewFakeRecorder()
|
fakeRecorder := NewFakeRecorder()
|
||||||
nodeController.recorder = fakeRecorder
|
nodeController.recorder = fakeRecorder
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
if len(fakeRecorder.events) != 2 {
|
||||||
fakeNodeHandler.Delete("node0", nil)
|
t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.events), 2, fakeRecorder.events)
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
}
|
||||||
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
|
if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "DeletingNode" {
|
||||||
nodeUid, _ := value.UID.(string)
|
var reasons []string
|
||||||
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
|
for _, event := range fakeRecorder.events {
|
||||||
return true, 0
|
reasons = append(reasons, event.Reason)
|
||||||
})
|
|
||||||
if len(fakeRecorder.events) != 3 {
|
|
||||||
t.Fatalf("unexpected events: %v", fakeRecorder.events)
|
|
||||||
}
|
}
|
||||||
if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "RemovingNode" || fakeRecorder.events[2].Reason != "DeletingAllPods" {
|
t.Fatalf("unexpected events generation: %v", strings.Join(reasons, ","))
|
||||||
t.Fatalf("unexpected events generation: %v", fakeRecorder.events)
|
|
||||||
}
|
}
|
||||||
for _, event := range fakeRecorder.events {
|
for _, event := range fakeRecorder.events {
|
||||||
involvedObject := event.InvolvedObject
|
involvedObject := event.InvolvedObject
|
||||||
@ -1852,38 +1762,6 @@ func TestCheckPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCleanupOrphanedPods(t *testing.T) {
|
|
||||||
pods := []*api.Pod{
|
|
||||||
newPod("a", "foo"),
|
|
||||||
newPod("b", "bar"),
|
|
||||||
newPod("c", "gone"),
|
|
||||||
}
|
|
||||||
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
|
|
||||||
|
|
||||||
nc.nodeStore.Store.Add(newNode("foo"))
|
|
||||||
nc.nodeStore.Store.Add(newNode("bar"))
|
|
||||||
for _, pod := range pods {
|
|
||||||
p := pod
|
|
||||||
nc.podStore.Indexer.Add(&p)
|
|
||||||
}
|
|
||||||
|
|
||||||
var deleteCalls int
|
|
||||||
var deletedPodName string
|
|
||||||
forcefullyDeletePodFunc := func(p *api.Pod) error {
|
|
||||||
deleteCalls++
|
|
||||||
deletedPodName = p.ObjectMeta.Name
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
cleanupOrphanedPods(pods, nc.nodeStore.Store, forcefullyDeletePodFunc)
|
|
||||||
|
|
||||||
if deleteCalls != 1 {
|
|
||||||
t.Fatalf("expected one delete, got: %v", deleteCalls)
|
|
||||||
}
|
|
||||||
if deletedPodName != "c" {
|
|
||||||
t.Fatalf("expected deleted pod name to be 'c', but got: %q", deletedPodName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCheckNodeKubeletVersionParsing(t *testing.T) {
|
func TestCheckNodeKubeletVersionParsing(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
version string
|
version string
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/controller/informers"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
@ -42,60 +42,106 @@ const (
|
|||||||
|
|
||||||
type PodGCController struct {
|
type PodGCController struct {
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
|
|
||||||
|
// internalPodInformer is used to hold a personal informer. If we're using
|
||||||
|
// a normal shared informer, then the informer will be started for us. If
|
||||||
|
// we have a personal informer, we must start it ourselves. If you start
|
||||||
|
// the controller using NewPodGC(..., passing SharedInformer, ...), this
|
||||||
|
// will be null
|
||||||
|
internalPodInformer cache.SharedIndexInformer
|
||||||
|
|
||||||
podStore cache.StoreToPodLister
|
podStore cache.StoreToPodLister
|
||||||
podStoreSyncer *cache.Controller
|
nodeStore cache.StoreToNodeLister
|
||||||
|
|
||||||
|
podController cache.ControllerInterface
|
||||||
|
nodeController cache.ControllerInterface
|
||||||
|
|
||||||
deletePod func(namespace, name string) error
|
deletePod func(namespace, name string) error
|
||||||
threshold int
|
terminatedPodThreshold int
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *PodGCController {
|
func NewPodGC(kubeClient clientset.Interface, podInformer cache.SharedIndexInformer, terminatedPodThreshold int) *PodGCController {
|
||||||
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
|
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
|
||||||
}
|
}
|
||||||
gcc := &PodGCController{
|
gcc := &PodGCController{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
threshold: threshold,
|
terminatedPodThreshold: terminatedPodThreshold,
|
||||||
deletePod: func(namespace, name string) error {
|
deletePod: func(namespace, name string) error {
|
||||||
return kubeClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0))
|
return kubeClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
terminatedSelector := fields.ParseSelectorOrDie("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown))
|
gcc.podStore.Indexer = podInformer.GetIndexer()
|
||||||
|
gcc.podController = podInformer.GetController()
|
||||||
|
|
||||||
gcc.podStore.Indexer, gcc.podStoreSyncer = cache.NewIndexerInformer(
|
gcc.nodeStore.Store, gcc.nodeController = cache.NewInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
options.FieldSelector = terminatedSelector
|
return gcc.kubeClient.Core().Nodes().List(options)
|
||||||
return gcc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
|
||||||
},
|
},
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
options.FieldSelector = terminatedSelector
|
return gcc.kubeClient.Core().Nodes().Watch(options)
|
||||||
return gcc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
&api.Pod{},
|
&api.Node{},
|
||||||
resyncPeriod(),
|
controller.NoResyncPeriodFunc(),
|
||||||
cache.ResourceEventHandlerFuncs{},
|
cache.ResourceEventHandlerFuncs{},
|
||||||
// We don't need to build a index for podStore here actually, but build one for consistency.
|
|
||||||
// It will ensure that if people start making use of the podStore in more specific ways,
|
|
||||||
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return gcc
|
return gcc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewFromClient(
|
||||||
|
kubeClient clientset.Interface,
|
||||||
|
terminatedPodThreshold int,
|
||||||
|
) *PodGCController {
|
||||||
|
podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
|
controller := NewPodGC(kubeClient, podInformer, terminatedPodThreshold)
|
||||||
|
controller.internalPodInformer = podInformer
|
||||||
|
return controller
|
||||||
|
}
|
||||||
|
|
||||||
func (gcc *PodGCController) Run(stop <-chan struct{}) {
|
func (gcc *PodGCController) Run(stop <-chan struct{}) {
|
||||||
go gcc.podStoreSyncer.Run(stop)
|
if gcc.internalPodInformer != nil {
|
||||||
|
go gcc.podController.Run(stop)
|
||||||
|
}
|
||||||
|
go gcc.nodeController.Run(stop)
|
||||||
go wait.Until(gcc.gc, gcCheckPeriod, stop)
|
go wait.Until(gcc.gc, gcCheckPeriod, stop)
|
||||||
<-stop
|
<-stop
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gcc *PodGCController) gc() {
|
func (gcc *PodGCController) gc() {
|
||||||
terminatedPods, _ := gcc.podStore.List(labels.Everything())
|
pods, err := gcc.podStore.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error while listing all Pods: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if gcc.terminatedPodThreshold > 0 {
|
||||||
|
gcc.gcTerminated(pods)
|
||||||
|
}
|
||||||
|
gcc.gcOrphaned(pods)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isPodTerminated(pod *api.Pod) bool {
|
||||||
|
if phase := pod.Status.Phase; phase != api.PodPending && phase != api.PodRunning && phase != api.PodUnknown {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gcc *PodGCController) gcTerminated(pods []*api.Pod) {
|
||||||
|
terminatedPods := []*api.Pod{}
|
||||||
|
for _, pod := range pods {
|
||||||
|
if isPodTerminated(pod) {
|
||||||
|
terminatedPods = append(terminatedPods, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
terminatedPodCount := len(terminatedPods)
|
terminatedPodCount := len(terminatedPods)
|
||||||
sort.Sort(byCreationTimestamp(terminatedPods))
|
sort.Sort(byCreationTimestamp(terminatedPods))
|
||||||
|
|
||||||
deleteCount := terminatedPodCount - gcc.threshold
|
deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
|
||||||
|
|
||||||
if deleteCount > terminatedPodCount {
|
if deleteCount > terminatedPodCount {
|
||||||
deleteCount = terminatedPodCount
|
deleteCount = terminatedPodCount
|
||||||
@ -118,6 +164,26 @@ func (gcc *PodGCController) gc() {
|
|||||||
wait.Wait()
|
wait.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanupOrphanedPods deletes pods that are bound to nodes that don't exist.
|
||||||
|
func (gcc *PodGCController) gcOrphaned(pods []*api.Pod) {
|
||||||
|
glog.V(4).Infof("GC'ing orphaned")
|
||||||
|
|
||||||
|
for _, pod := range pods {
|
||||||
|
if pod.Spec.NodeName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, exists, _ := gcc.nodeStore.GetByKey(pod.Spec.NodeName); exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName)
|
||||||
|
if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Forced deletion of oprhaned Pod %s succeeded", pod.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
||||||
type byCreationTimestamp []*api.Pod
|
type byCreationTimestamp []*api.Pod
|
||||||
|
|
||||||
|
@ -23,12 +23,12 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGC(t *testing.T) {
|
func TestGCTerminated(t *testing.T) {
|
||||||
type nameToPhase struct {
|
type nameToPhase struct {
|
||||||
name string
|
name string
|
||||||
phase api.PodPhase
|
phase api.PodPhase
|
||||||
@ -45,8 +45,27 @@ func TestGC(t *testing.T) {
|
|||||||
{name: "b", phase: api.PodSucceeded},
|
{name: "b", phase: api.PodSucceeded},
|
||||||
},
|
},
|
||||||
threshold: 0,
|
threshold: 0,
|
||||||
|
// threshold = 0 disables terminated pod deletion
|
||||||
|
deletedPodNames: sets.NewString(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pods: []nameToPhase{
|
||||||
|
{name: "a", phase: api.PodFailed},
|
||||||
|
{name: "b", phase: api.PodSucceeded},
|
||||||
|
{name: "c", phase: api.PodFailed},
|
||||||
|
},
|
||||||
|
threshold: 1,
|
||||||
deletedPodNames: sets.NewString("a", "b"),
|
deletedPodNames: sets.NewString("a", "b"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
pods: []nameToPhase{
|
||||||
|
{name: "a", phase: api.PodRunning},
|
||||||
|
{name: "b", phase: api.PodSucceeded},
|
||||||
|
{name: "c", phase: api.PodFailed},
|
||||||
|
},
|
||||||
|
threshold: 1,
|
||||||
|
deletedPodNames: sets.NewString("b"),
|
||||||
|
},
|
||||||
{
|
{
|
||||||
pods: []nameToPhase{
|
pods: []nameToPhase{
|
||||||
{name: "a", phase: api.PodFailed},
|
{name: "a", phase: api.PodFailed},
|
||||||
@ -67,7 +86,7 @@ func TestGC(t *testing.T) {
|
|||||||
|
|
||||||
for i, test := range testCases {
|
for i, test := range testCases {
|
||||||
client := fake.NewSimpleClientset()
|
client := fake.NewSimpleClientset()
|
||||||
gcc := New(client, controller.NoResyncPeriodFunc, test.threshold)
|
gcc := NewFromClient(client, test.threshold)
|
||||||
deletedPodNames := make([]string, 0)
|
deletedPodNames := make([]string, 0)
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
gcc.deletePod = func(_, name string) error {
|
gcc.deletePod = func(_, name string) error {
|
||||||
@ -83,9 +102,86 @@ func TestGC(t *testing.T) {
|
|||||||
gcc.podStore.Indexer.Add(&api.Pod{
|
gcc.podStore.Indexer.Add(&api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}},
|
ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}},
|
||||||
Status: api.PodStatus{Phase: pod.phase},
|
Status: api.PodStatus{Phase: pod.phase},
|
||||||
|
Spec: api.PodSpec{NodeName: "node"},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
store.Add(&api.Node{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node"},
|
||||||
|
})
|
||||||
|
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
|
||||||
|
|
||||||
|
gcc.gc()
|
||||||
|
|
||||||
|
pass := true
|
||||||
|
for _, pod := range deletedPodNames {
|
||||||
|
if !test.deletedPodNames.Has(pod) {
|
||||||
|
pass = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(deletedPodNames) != len(test.deletedPodNames) {
|
||||||
|
pass = false
|
||||||
|
}
|
||||||
|
if !pass {
|
||||||
|
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGCOrphaned(t *testing.T) {
|
||||||
|
type nameToPhase struct {
|
||||||
|
name string
|
||||||
|
phase api.PodPhase
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
pods []nameToPhase
|
||||||
|
threshold int
|
||||||
|
deletedPodNames sets.String
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
pods: []nameToPhase{
|
||||||
|
{name: "a", phase: api.PodFailed},
|
||||||
|
{name: "b", phase: api.PodSucceeded},
|
||||||
|
},
|
||||||
|
threshold: 0,
|
||||||
|
deletedPodNames: sets.NewString("a", "b"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pods: []nameToPhase{
|
||||||
|
{name: "a", phase: api.PodRunning},
|
||||||
|
},
|
||||||
|
threshold: 1,
|
||||||
|
deletedPodNames: sets.NewString("a"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range testCases {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
|
gcc := NewFromClient(client, test.threshold)
|
||||||
|
deletedPodNames := make([]string, 0)
|
||||||
|
var lock sync.Mutex
|
||||||
|
gcc.deletePod = func(_, name string) error {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
deletedPodNames = append(deletedPodNames, name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
creationTime := time.Unix(0, 0)
|
||||||
|
for _, pod := range test.pods {
|
||||||
|
creationTime = creationTime.Add(1 * time.Hour)
|
||||||
|
gcc.podStore.Indexer.Add(&api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}},
|
||||||
|
Status: api.PodStatus{Phase: pod.phase},
|
||||||
|
Spec: api.PodSpec{NodeName: "node"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
gcc.nodeStore = cache.StoreToNodeLister{Store: store}
|
||||||
|
|
||||||
gcc.gc()
|
gcc.gc()
|
||||||
|
|
||||||
pass := true
|
pass := true
|
||||||
|
Loading…
Reference in New Issue
Block a user