diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 99a08b69d3c..de895cc98d7 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -90,14 +90,14 @@ func (c *PodConfig) Channel(source string) chan<- interface{} { return c.mux.Channel(source) } -// SeenAllSources returns true if this config has received a SET -// message from all configured sources, false otherwise. -func (c *PodConfig) SeenAllSources() bool { +// SeenAllSources returns true if seenSources contains all sources in the +// config, and also this config has received a SET message from each source. +func (c *PodConfig) SeenAllSources(seenSources sets.String) bool { if c.pods == nil { return false } - glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen) - return c.pods.seenSources(c.sources.List()...) + glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources) + return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...) } // Updates returns a channel of updates to the configuration, properly denormalized. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e6db9884d49..249a531a7a0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -118,7 +118,7 @@ type SyncHandler interface { HandlePodCleanups() error } -type SourcesReadyFn func() bool +type SourcesReadyFn func(sourcesSeen sets.String) bool // Wait for the container runtime to be up with a timeout. func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error { @@ -418,6 +418,7 @@ func NewMainKubelet( klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) + klet.sourcesSeen = sets.NewString() return klet, nil } @@ -441,6 +442,9 @@ type Kubelet struct { podWorkers PodWorkers resyncInterval time.Duration sourcesReady SourcesReadyFn + // sourcesSeen records the sources seen by kubelet. This set is not thread + // safe and should only be access by the main kubelet syncloop goroutine. + sourcesSeen sets.String podManager podManager @@ -597,6 +601,15 @@ type Kubelet struct { daemonEndpoints *api.NodeDaemonEndpoints } +func (kl *Kubelet) allSourcesReady() bool { + // Make a copy of the sourcesSeen list because it's not thread-safe. + return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...)) +} + +func (kl *Kubelet) addSource(source string) { + kl.sourcesSeen.Insert(source) +} + // getRootDir returns the full path to the directory under which kubelet can // store data. These functions are useful to pass interfaces to other modules // that may need to know where to write data without getting a whole kubelet @@ -1618,7 +1631,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api. } func (kl *Kubelet) deletePod(uid types.UID) error { - if !kl.sourcesReady() { + if !kl.allSourcesReady() { // If the sources aren't ready, skip deletion, as we may accidentally delete pods // for sources that haven't reported yet. return fmt.Errorf("skipping delete because sources aren't ready yet") @@ -1906,7 +1919,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { // housekeepingMinimumPeriod. // TODO (#13418): Investigate whether we can/should spawn a dedicated // goroutine for housekeeping - if !kl.sourcesReady() { + if !kl.allSourcesReady() { // If the sources aren't ready, skip housekeeping, as we may // accidentally delete pods from unready sources. glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.") @@ -1930,6 +1943,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl glog.Errorf("Update channel is closed. Exiting the sync loop.") return false } + kl.addSource(u.Source) switch u.Op { case ADD: glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods)) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 152b97c84eb..e554cb95c6b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/bandwidth" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" @@ -102,7 +103,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } - kubelet.sourcesReady = func() bool { return true } + kubelet.sourcesReady = func(_ sets.String) bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.nodeLister = testNodeLister{} @@ -393,7 +394,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) kubelet := testKubelet.kubelet - kubelet.sourcesReady = func() bool { return ready } + kubelet.sourcesReady = func(_ sets.String) bool { return ready } fakeRuntime.PodList = []*kubecontainer.Pod{ {