mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-31 05:40:42 +00:00 
			
		
		
		
	add pod garbage collection
This commit is contained in:
		| @@ -38,6 +38,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/controller/daemon" | ||||
| 	"k8s.io/kubernetes/pkg/controller/deployment" | ||||
| 	"k8s.io/kubernetes/pkg/controller/endpoint" | ||||
| 	"k8s.io/kubernetes/pkg/controller/gc" | ||||
| 	"k8s.io/kubernetes/pkg/controller/job" | ||||
| 	"k8s.io/kubernetes/pkg/controller/namespace" | ||||
| 	"k8s.io/kubernetes/pkg/controller/node" | ||||
| @@ -74,6 +75,7 @@ type CMServer struct { | ||||
| 	NamespaceSyncPeriod               time.Duration | ||||
| 	PVClaimBinderSyncPeriod           time.Duration | ||||
| 	VolumeConfigFlags                 VolumeConfigFlags | ||||
| 	TerminatedPodGCThreshold          int | ||||
| 	HorizontalPodAutoscalerSyncPeriod time.Duration | ||||
| 	DeploymentControllerSyncPeriod    time.Duration | ||||
| 	RegisterRetryCount                int | ||||
| @@ -164,6 +166,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { | ||||
| 	fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.") | ||||
| 	fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod.  This is for development and testing only and will not work in a multi-node cluster.") | ||||
| 	fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod.  This is for development and testing only and will not work in a multi-node cluster.") | ||||
| 	fs.IntVar(&s.TerminatedPodGCThreshold, "terminated-pod-gc-threshold", s.TerminatedPodGCThreshold, "Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.") | ||||
| 	fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.") | ||||
| 	fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.") | ||||
| 	fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.") | ||||
| @@ -244,6 +247,11 @@ func (s *CMServer) Run(_ []string) error { | ||||
| 	go job.NewJobController(kubeClient). | ||||
| 		Run(s.ConcurrentJobSyncs, util.NeverStop) | ||||
|  | ||||
| 	if s.TerminatedPodGCThreshold > 0 { | ||||
| 		go gc.New(kubeClient, s.TerminatedPodGCThreshold). | ||||
| 			Run(util.NeverStop) | ||||
| 	} | ||||
|  | ||||
| 	cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) | ||||
| 	if err != nil { | ||||
| 		glog.Fatalf("Cloud provider could not be initialized: %v", err) | ||||
|   | ||||
| @@ -292,4 +292,5 @@ www-prefix | ||||
| retry_time | ||||
| file_content_in_loop | ||||
| cpu-cfs-quota | ||||
| terminated-pod-gc-threshold | ||||
|  | ||||
|   | ||||
							
								
								
									
										24
									
								
								pkg/controller/gc/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								pkg/controller/gc/doc.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| /* | ||||
| Copyright 2015 The Kubernetes Authors All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| // Package gc contains a very simple pod "garbage collector" implementation, | ||||
| // GCController, that runs in the controller manager. If the number of pods | ||||
| // in terminated phases (right now either Failed or Succeeded) surpasses a | ||||
| // configurable threshold, the controller will delete pods in terminated state | ||||
| // until the system reaches the allowed threshold again. The GCController | ||||
| // prioritizes pods to delete by sorting by creation timestamp and deleting the | ||||
| // oldest objects first. The GCController will not delete non-terminated pods. | ||||
| package gc | ||||
							
								
								
									
										137
									
								
								pkg/controller/gc/gc_controller.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										137
									
								
								pkg/controller/gc/gc_controller.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,137 @@ | ||||
| /* | ||||
| Copyright 2015 The Kubernetes Authors All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package gc | ||||
|  | ||||
| import ( | ||||
| 	"sort" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/client/cache" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	client "k8s.io/kubernetes/pkg/client/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	"k8s.io/kubernetes/pkg/controller/framework" | ||||
| 	"k8s.io/kubernetes/pkg/fields" | ||||
| 	"k8s.io/kubernetes/pkg/labels" | ||||
| 	"k8s.io/kubernetes/pkg/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/util" | ||||
| 	"k8s.io/kubernetes/pkg/watch" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	fullResyncPeriod = 0 | ||||
| 	gcCheckPeriod    = 20 * time.Second | ||||
| ) | ||||
|  | ||||
| type GCController struct { | ||||
| 	kubeClient     client.Interface | ||||
| 	podControl     controller.PodControlInterface | ||||
| 	podStore       cache.StoreToPodLister | ||||
| 	podStoreSyncer *framework.Controller | ||||
| 	threshold      int | ||||
| } | ||||
|  | ||||
| func New(kubeClient client.Interface, threshold int) *GCController { | ||||
| 	eventBroadcaster := record.NewBroadcaster() | ||||
| 	eventBroadcaster.StartLogging(glog.Infof) | ||||
| 	eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) | ||||
|  | ||||
| 	gcc := &GCController{ | ||||
| 		kubeClient: kubeClient, | ||||
| 		podControl: controller.RealPodControl{ | ||||
| 			Recorder:   eventBroadcaster.NewRecorder(api.EventSource{Component: "pod-garbage-collector"}), | ||||
| 			KubeClient: kubeClient, | ||||
| 		}, | ||||
| 		threshold: threshold, | ||||
| 	} | ||||
|  | ||||
| 	terminatedSelector := compileTerminatedPodSelector() | ||||
|  | ||||
| 	gcc.podStore.Store, gcc.podStoreSyncer = framework.NewInformer( | ||||
| 		&cache.ListWatch{ | ||||
| 			ListFunc: func() (runtime.Object, error) { | ||||
| 				return gcc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), terminatedSelector) | ||||
| 			}, | ||||
| 			WatchFunc: func(rv string) (watch.Interface, error) { | ||||
| 				return gcc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), terminatedSelector, rv) | ||||
| 			}, | ||||
| 		}, | ||||
| 		&api.Pod{}, | ||||
| 		fullResyncPeriod, | ||||
| 		framework.ResourceEventHandlerFuncs{}, | ||||
| 	) | ||||
| 	return gcc | ||||
| } | ||||
|  | ||||
| func (gcc *GCController) Run(stop <-chan struct{}) { | ||||
| 	go gcc.podStoreSyncer.Run(stop) | ||||
| 	go util.Until(gcc.gc, gcCheckPeriod, stop) | ||||
| 	<-stop | ||||
| } | ||||
|  | ||||
| func (gcc *GCController) gc() { | ||||
| 	terminatedPods, _ := gcc.podStore.List(labels.Everything()) | ||||
| 	terminatedPodCount := len(terminatedPods) | ||||
| 	sort.Sort(byCreationTimestamp(terminatedPods)) | ||||
|  | ||||
| 	deleteCount := terminatedPodCount - gcc.threshold | ||||
|  | ||||
| 	if deleteCount > terminatedPodCount { | ||||
| 		deleteCount = terminatedPodCount | ||||
| 	} | ||||
| 	if deleteCount > 0 { | ||||
| 		glog.Infof("garbage collecting %v pods", deleteCount) | ||||
| 	} | ||||
|  | ||||
| 	var wait sync.WaitGroup | ||||
| 	for i := 0; i < deleteCount; i++ { | ||||
| 		wait.Add(1) | ||||
| 		go func(namespace string, name string) { | ||||
| 			defer wait.Done() | ||||
| 			if err := gcc.podControl.DeletePod(namespace, name); err != nil { | ||||
| 				// ignore not founds | ||||
| 				defer util.HandleError(err) | ||||
| 			} | ||||
| 		}(terminatedPods[i].Namespace, terminatedPods[i].Name) | ||||
| 	} | ||||
| 	wait.Wait() | ||||
| } | ||||
|  | ||||
| func compileTerminatedPodSelector() fields.Selector { | ||||
| 	selector, err := fields.ParseSelector("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown)) | ||||
| 	if err != nil { | ||||
| 		panic("terminatedSelector must compile: " + err.Error()) | ||||
| 	} | ||||
| 	return selector | ||||
| } | ||||
|  | ||||
| // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. | ||||
| type byCreationTimestamp []*api.Pod | ||||
|  | ||||
| func (o byCreationTimestamp) Len() int      { return len(o) } | ||||
| func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } | ||||
|  | ||||
| func (o byCreationTimestamp) Less(i, j int) bool { | ||||
| 	if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { | ||||
| 		return o[i].Name < o[j].Name | ||||
| 	} | ||||
| 	return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) | ||||
| } | ||||
							
								
								
									
										129
									
								
								pkg/controller/gc/gc_controller_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										129
									
								
								pkg/controller/gc/gc_controller_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,129 @@ | ||||
| /* | ||||
| Copyright 2015 The Kubernetes Authors All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package gc | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/client/unversioned/testclient" | ||||
| 	"k8s.io/kubernetes/pkg/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/util/sets" | ||||
| ) | ||||
|  | ||||
| type FakePodControl struct { | ||||
| 	podSpec       []api.PodTemplateSpec | ||||
| 	deletePodName []string | ||||
| 	lock          sync.Mutex | ||||
| 	err           error | ||||
| } | ||||
|  | ||||
| func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { | ||||
| 	panic("unimplemented") | ||||
| } | ||||
|  | ||||
| func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { | ||||
| 	panic("unimplemented") | ||||
| } | ||||
|  | ||||
| func (f *FakePodControl) DeletePod(namespace string, podName string) error { | ||||
| 	f.lock.Lock() | ||||
| 	defer f.lock.Unlock() | ||||
| 	if f.err != nil { | ||||
| 		return f.err | ||||
| 	} | ||||
| 	f.deletePodName = append(f.deletePodName, podName) | ||||
| 	return nil | ||||
| } | ||||
| func (f *FakePodControl) clear() { | ||||
| 	f.lock.Lock() | ||||
| 	defer f.lock.Unlock() | ||||
| 	f.deletePodName = []string{} | ||||
| 	f.podSpec = []api.PodTemplateSpec{} | ||||
| } | ||||
|  | ||||
| func TestGC(t *testing.T) { | ||||
|  | ||||
| 	testCases := []struct { | ||||
| 		pods            map[string]api.PodPhase | ||||
| 		threshold       int | ||||
| 		deletedPodNames sets.String | ||||
| 	}{ | ||||
| 		{ | ||||
| 			pods: map[string]api.PodPhase{ | ||||
| 				"a": api.PodFailed, | ||||
| 				"b": api.PodSucceeded, | ||||
| 			}, | ||||
| 			threshold:       0, | ||||
| 			deletedPodNames: sets.NewString("a", "b"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			pods: map[string]api.PodPhase{ | ||||
| 				"a": api.PodFailed, | ||||
| 				"b": api.PodSucceeded, | ||||
| 			}, | ||||
| 			threshold:       1, | ||||
| 			deletedPodNames: sets.NewString("a"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			pods: map[string]api.PodPhase{ | ||||
| 				"a": api.PodFailed, | ||||
| 				"b": api.PodSucceeded, | ||||
| 			}, | ||||
| 			threshold:       5, | ||||
| 			deletedPodNames: sets.NewString(), | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range testCases { | ||||
| 		client := testclient.NewSimpleFake() | ||||
| 		gcc := New(client, test.threshold) | ||||
| 		fake := &FakePodControl{} | ||||
| 		gcc.podControl = fake | ||||
|  | ||||
| 		creationTime := time.Unix(0, 0) | ||||
| 		for name, phase := range test.pods { | ||||
| 			creationTime = creationTime.Add(1 * time.Hour) | ||||
| 			gcc.podStore.Store.Add(&api.Pod{ | ||||
| 				ObjectMeta: api.ObjectMeta{Name: name, CreationTimestamp: unversioned.Time{creationTime}}, | ||||
| 				Status:     api.PodStatus{Phase: phase}, | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 		gcc.gc() | ||||
|  | ||||
| 		pass := true | ||||
| 		for _, pod := range fake.deletePodName { | ||||
| 			if !test.deletedPodNames.Has(pod) { | ||||
| 				pass = false | ||||
| 			} | ||||
| 		} | ||||
| 		if len(fake.deletePodName) != len(test.deletedPodNames) { | ||||
| 			pass = false | ||||
| 		} | ||||
| 		if !pass { | ||||
| 			t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames.List(), fake.deletePodName) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestTerminatedPodSelectorCompiles(t *testing.T) { | ||||
| 	compileTerminatedPodSelector() | ||||
| } | ||||
		Reference in New Issue
	
	Block a user