Merge pull request #79681 from tedyu/clean-pods-param

Pass desiredPods to CleanupPods
This commit is contained in:
Kubernetes Prow Robot 2019-07-11 17:01:47 -07:00 committed by GitHub
commit fc9db7a042
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 24 additions and 21 deletions

View File

@ -1013,14 +1013,14 @@ func (kl *Kubelet) HandlePodCleanups() error {
// These two conditions could be alleviated by checkpointing kubelet. // These two conditions could be alleviated by checkpointing kubelet.
activePods := kl.filterOutTerminatedPods(allPods) activePods := kl.filterOutTerminatedPods(allPods)
desiredPods := make(map[types.UID]empty) desiredPods := make(map[types.UID]sets.Empty)
for _, pod := range activePods { for _, pod := range activePods {
desiredPods[pod.UID] = empty{} desiredPods[pod.UID] = sets.Empty{}
} }
// Stop the workers for no-longer existing pods. // Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers? // TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods) kl.probeManager.CleanupPods(desiredPods)
runningPods, err := kl.runtimeCache.GetPods() runningPods, err := kl.runtimeCache.GetPods()
if err != nil { if err != nil {

View File

@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog" "k8s.io/klog"
@ -72,7 +73,7 @@ type UpdatePodOptions struct {
// PodWorkers is an abstract interface for testability. // PodWorkers is an abstract interface for testability.
type PodWorkers interface { type PodWorkers interface {
UpdatePod(options *UpdatePodOptions) UpdatePod(options *UpdatePodOptions)
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty)
ForgetWorker(uid types.UID) ForgetWorker(uid types.UID)
} }
@ -251,7 +252,7 @@ func (p *podWorkers) ForgetWorker(uid types.UID) {
p.removeWorker(uid) p.removeWorker(uid)
} }
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) {
p.podLock.Lock() p.podLock.Lock()
defer p.podLock.Unlock() defer p.podLock.Unlock()
for key := range p.podUpdates { for key := range p.podUpdates {

View File

@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
@ -57,7 +58,7 @@ func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
} }
} }
func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {} func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) {}
func (f *fakePodWorkers) ForgetWorker(uid types.UID) {} func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
@ -219,9 +220,9 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
} }
desiredPods := map[types.UID]empty{} desiredPods := map[types.UID]sets.Empty{}
desiredPods[types.UID(2)] = empty{} desiredPods[types.UID(2)] = sets.Empty{}
desiredPods[types.UID(14)] = empty{} desiredPods[types.UID(14)] = sets.Empty{}
podWorkers.ForgetNonExistingPodWorkers(desiredPods) podWorkers.ForgetNonExistingPodWorkers(desiredPods)
if len(podWorkers.podUpdates) != 2 { if len(podWorkers.podUpdates) != 2 {
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
@ -233,7 +234,7 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
t.Errorf("No updates channel for pod 14") t.Errorf("No updates channel for pod 14")
} }
podWorkers.ForgetNonExistingPodWorkers(map[types.UID]empty{}) podWorkers.ForgetNonExistingPodWorkers(map[types.UID]sets.Empty{})
if len(podWorkers.podUpdates) != 0 { if len(podWorkers.podUpdates) != 0 {
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
} }

View File

@ -61,6 +61,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -62,8 +62,8 @@ type Manager interface {
RemovePod(pod *v1.Pod) RemovePod(pod *v1.Pod)
// CleanupPods handles cleaning up pods which should no longer be running. // CleanupPods handles cleaning up pods which should no longer be running.
// It takes a list of "active pods" which should not be cleaned up. // It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(activePods []*v1.Pod) CleanupPods(desiredPods map[types.UID]sets.Empty)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states. // container based on container running status, cached probe results and worker states.
@ -198,12 +198,7 @@ func (m *manager) RemovePod(pod *v1.Pod) {
} }
} }
func (m *manager) CleanupPods(activePods []*v1.Pod) { func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
desiredPods := make(map[types.UID]sets.Empty)
for _, pod := range activePods {
desiredPods[pod.UID] = sets.Empty{}
}
m.workerLock.RLock() m.workerLock.RLock()
defer m.workerLock.RUnlock() defer m.workerLock.RUnlock()

View File

@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog" "k8s.io/klog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -158,7 +159,9 @@ func TestCleanupPods(t *testing.T) {
m.AddPod(&podToCleanup) m.AddPod(&podToCleanup)
m.AddPod(&podToKeep) m.AddPod(&podToKeep)
m.CleanupPods([]*v1.Pod{&podToKeep}) desiredPods := map[types.UID]sets.Empty{}
desiredPods[podToKeep.UID] = sets.Empty{}
m.CleanupPods(desiredPods)
removedProbes := []probeKey{ removedProbes := []probeKey{
{"pod_cleanup", "prober1", readiness}, {"pod_cleanup", "prober1", readiness},
@ -197,7 +200,7 @@ func TestCleanupRepeated(t *testing.T) {
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
m.CleanupPods([]*v1.Pod{}) m.CleanupPods(map[types.UID]sets.Empty{})
} }
} }

View File

@ -12,6 +12,7 @@ go_library(
deps = [ deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
], ],
) )

View File

@ -19,6 +19,7 @@ package testing
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
) )
// FakeManager simulates a prober.Manager for testing. // FakeManager simulates a prober.Manager for testing.
@ -33,7 +34,7 @@ func (FakeManager) AddPod(_ *v1.Pod) {}
func (FakeManager) RemovePod(_ *v1.Pod) {} func (FakeManager) RemovePod(_ *v1.Pod) {}
// CleanupPods simulates cleaning up Pods. // CleanupPods simulates cleaning up Pods.
func (FakeManager) CleanupPods(_ []*v1.Pod) {} func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {}
// Start simulates start syncing the probe status // Start simulates start syncing the probe status
func (FakeManager) Start() {} func (FakeManager) Start() {}