From 5abdfcdfe64aa0538176b136d77a6337f5d64069 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Mon, 5 Oct 2015 18:20:57 -0700 Subject: [PATCH] kubelet: fix all sources ready condition The current implementation considers a source seen when it receives a SET at kubelet/config/config.go. However, the main kubelet sync loop may not have received the pod update from the source via the channel. This change ensures that kubelet would consider all sources are ready only after the sync loop has seen all the sources. --- pkg/kubelet/config/config.go | 10 +++++----- pkg/kubelet/kubelet.go | 20 +++++++++++++++++--- pkg/kubelet/kubelet_test.go | 5 +++-- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 99a08b69d3c..de895cc98d7 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -90,14 +90,14 @@ func (c *PodConfig) Channel(source string) chan<- interface{} { return c.mux.Channel(source) } -// SeenAllSources returns true if this config has received a SET -// message from all configured sources, false otherwise. -func (c *PodConfig) SeenAllSources() bool { +// SeenAllSources returns true if seenSources contains all sources in the +// config, and also this config has received a SET message from each source. +func (c *PodConfig) SeenAllSources(seenSources sets.String) bool { if c.pods == nil { return false } - glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen) - return c.pods.seenSources(c.sources.List()...) + glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources) + return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...) } // Updates returns a channel of updates to the configuration, properly denormalized. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e6db9884d49..249a531a7a0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -118,7 +118,7 @@ type SyncHandler interface { HandlePodCleanups() error } -type SourcesReadyFn func() bool +type SourcesReadyFn func(sourcesSeen sets.String) bool // Wait for the container runtime to be up with a timeout. func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error { @@ -418,6 +418,7 @@ func NewMainKubelet( klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) + klet.sourcesSeen = sets.NewString() return klet, nil } @@ -441,6 +442,9 @@ type Kubelet struct { podWorkers PodWorkers resyncInterval time.Duration sourcesReady SourcesReadyFn + // sourcesSeen records the sources seen by kubelet. This set is not thread + // safe and should only be access by the main kubelet syncloop goroutine. + sourcesSeen sets.String podManager podManager @@ -597,6 +601,15 @@ type Kubelet struct { daemonEndpoints *api.NodeDaemonEndpoints } +func (kl *Kubelet) allSourcesReady() bool { + // Make a copy of the sourcesSeen list because it's not thread-safe. + return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...)) +} + +func (kl *Kubelet) addSource(source string) { + kl.sourcesSeen.Insert(source) +} + // getRootDir returns the full path to the directory under which kubelet can // store data. These functions are useful to pass interfaces to other modules // that may need to know where to write data without getting a whole kubelet @@ -1618,7 +1631,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api. } func (kl *Kubelet) deletePod(uid types.UID) error { - if !kl.sourcesReady() { + if !kl.allSourcesReady() { // If the sources aren't ready, skip deletion, as we may accidentally delete pods // for sources that haven't reported yet. return fmt.Errorf("skipping delete because sources aren't ready yet") @@ -1906,7 +1919,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { // housekeepingMinimumPeriod. // TODO (#13418): Investigate whether we can/should spawn a dedicated // goroutine for housekeeping - if !kl.sourcesReady() { + if !kl.allSourcesReady() { // If the sources aren't ready, skip housekeeping, as we may // accidentally delete pods from unready sources. glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.") @@ -1930,6 +1943,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl glog.Errorf("Update channel is closed. Exiting the sync loop.") return false } + kl.addSource(u.Source) switch u.Op { case ADD: glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods)) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 152b97c84eb..e554cb95c6b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/bandwidth" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" @@ -102,7 +103,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } - kubelet.sourcesReady = func() bool { return true } + kubelet.sourcesReady = func(_ sets.String) bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.nodeLister = testNodeLister{} @@ -393,7 +394,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) kubelet := testKubelet.kubelet - kubelet.sourcesReady = func() bool { return ready } + kubelet.sourcesReady = func(_ sets.String) bool { return ready } fakeRuntime.PodList = []*kubecontainer.Pod{ {