diff --git a/pkg/kubelet/config/sources.go b/pkg/kubelet/config/sources.go new file mode 100644 index 00000000000..e26b9de28a4 --- /dev/null +++ b/pkg/kubelet/config/sources.go @@ -0,0 +1,67 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package config implements the pod configuration readers. +package config + +import ( + "sync" + + "k8s.io/kubernetes/pkg/util/sets" +) + +// SourcesReadyFn is function that returns true if the specified sources have been seen. +type SourcesReadyFn func(sourcesSeen sets.String) bool + +// SourcesReady tracks the set of configured sources seen by the kubelet. +type SourcesReady interface { + // AddSource adds the specified source to the set of sources managed. + AddSource(source string) + // AllReady returns true if the currently configured sources have all been seen. + AllReady() bool +} + +// NewSourcesReady returns a SourcesReady with the specified function. +func NewSourcesReady(sourcesReadyFn SourcesReadyFn) SourcesReady { + return &sourcesImpl{ + sourcesSeen: sets.NewString(), + sourcesReadyFn: sourcesReadyFn, + } +} + +// sourcesImpl implements SourcesReady. It is thread-safe. +type sourcesImpl struct { + // lock protects access to sources seen. + lock sync.RWMutex + // set of sources seen. + sourcesSeen sets.String + // sourcesReady is a function that evaluates if the sources are ready. + sourcesReadyFn SourcesReadyFn +} + +// Add adds the specified source to the set of sources managed. +func (s *sourcesImpl) AddSource(source string) { + s.lock.Lock() + defer s.lock.Unlock() + s.sourcesSeen.Insert(source) +} + +// AllReady returns true if each configured source is ready. +func (s *sourcesImpl) AllReady() bool { + s.lock.RLock() + defer s.lock.RUnlock() + return s.sourcesReadyFn(s.sourcesSeen) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 39687d1d1c7..e6a7fea5d54 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/envvars" @@ -157,12 +158,10 @@ type SyncHandler interface { HandlePodCleanups() error } -type SourcesReadyFn func(sourcesSeen sets.String) bool - // Option is a functional option type for Kubelet type Option func(*Kubelet) -// New instantiates a new Kubelet object along with all the required internal modules. +// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. func NewMainKubelet( hostname string, @@ -177,7 +176,7 @@ func NewMainKubelet( eventQPS float32, eventBurst int, containerGCPolicy kubecontainer.ContainerGCPolicy, - sourcesReady SourcesReadyFn, + sourcesReadyFn config.SourcesReadyFn, registerNode bool, registerSchedulable bool, standaloneMode bool, @@ -306,7 +305,7 @@ func NewMainKubelet( resyncInterval: resyncInterval, containerRefManager: containerRefManager, httpClient: &http.Client{}, - sourcesReady: sourcesReady, + sourcesReady: config.NewSourcesReady(sourcesReadyFn), registerNode: registerNode, registerSchedulable: registerSchedulable, standaloneMode: standaloneMode, @@ -490,7 +489,6 @@ func NewMainKubelet( klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) - klet.sourcesSeen = sets.NewString() klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() // apply functional Option's @@ -558,12 +556,8 @@ type Kubelet struct { // pods on this node. resyncInterval time.Duration - // sourcesReady is a function to call to determine if all config sources - // are ready. - sourcesReady SourcesReadyFn - // sourcesSeen records the sources seen by kubelet. This set is not thread - // safe and should only be access by the main kubelet syncloop goroutine. - sourcesSeen sets.String + // sourcesReady records the sources seen by the kubelet, it is thread-safe. + sourcesReady config.SourcesReady // podManager is a facade that abstracts away the various sources of pods // this Kubelet services. @@ -828,18 +822,6 @@ func (kl *Kubelet) validateNodeIP() error { return fmt.Errorf("Node IP: %q not found in the host's network interfaces", kl.nodeIP.String()) } -// allSourcesReady returns whether all seen pod sources are ready. -func (kl *Kubelet) allSourcesReady() bool { - // Make a copy of the sourcesSeen list because it's not thread-safe. - return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...)) -} - -// addSource adds a new pod source to the list of sources seen. -// TODO: reassess in the context of kubernetes/24810 -func (kl *Kubelet) addSource(source string) { - kl.sourcesSeen.Insert(source) -} - // dirExists returns true if the path exists and represents a directory. func dirExists(path string) bool { s, err := os.Stat(path) @@ -2149,7 +2131,7 @@ func (kl *Kubelet) deletePod(pod *api.Pod) error { if pod == nil { return fmt.Errorf("deletePod does not allow nil pod") } - if !kl.allSourcesReady() { + if !kl.sourcesReady.AllReady() { // If the sources aren't ready, skip deletion, as we may accidentally delete pods // for sources that haven't reported yet. return fmt.Errorf("skipping delete because sources aren't ready yet") @@ -2496,7 +2478,7 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle glog.Errorf("Update channel is closed. Exiting the sync loop.") return false } - kl.addSource(u.Source) + kl.sourcesReady.AddSource(u.Source) switch u.Op { case kubetypes.ADD: @@ -2553,8 +2535,7 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle handler.HandlePodSyncs([]*api.Pod{pod}) } case <-housekeepingCh: - // It's time to do housekeeping - if !kl.allSourcesReady() { + if !kl.sourcesReady.AllReady() { // If the sources aren't ready, skip housekeeping, as we may // accidentally delete pods from unready sources. glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index fe707862ca4..453e2be4624 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/client/testing/core" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -136,7 +137,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } - kubelet.sourcesReady = func(_ sets.String) bool { return true } + kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true }) kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.nodeLister = testNodeLister{} @@ -309,7 +310,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) kubelet := testKubelet.kubelet - kubelet.sourcesReady = func(_ sets.String) bool { return ready } + kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready }) fakeRuntime.PodList = []*kubecontainer.Pod{ {