Merge pull request #24810 from derekwaynecarr/sources_cleanup

Automatic merge from submit-queue

Clean-up sources ready tracking in kubelet

moved sources ready tracking behind an interface, made it thread-safe.
This commit is contained in:
k8s-merge-robot 2016-05-09 05:48:09 -07:00
commit 545d56a63b
3 changed files with 79 additions and 30 deletions

View File

@ -0,0 +1,67 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package config implements the pod configuration readers.
package config
import (
"sync"
"k8s.io/kubernetes/pkg/util/sets"
)
// SourcesReadyFn is function that returns true if the specified sources have been seen.
type SourcesReadyFn func(sourcesSeen sets.String) bool
// SourcesReady tracks the set of configured sources seen by the kubelet.
type SourcesReady interface {
// AddSource adds the specified source to the set of sources managed.
AddSource(source string)
// AllReady returns true if the currently configured sources have all been seen.
AllReady() bool
}
// NewSourcesReady returns a SourcesReady with the specified function.
func NewSourcesReady(sourcesReadyFn SourcesReadyFn) SourcesReady {
return &sourcesImpl{
sourcesSeen: sets.NewString(),
sourcesReadyFn: sourcesReadyFn,
}
}
// sourcesImpl implements SourcesReady. It is thread-safe.
type sourcesImpl struct {
// lock protects access to sources seen.
lock sync.RWMutex
// set of sources seen.
sourcesSeen sets.String
// sourcesReady is a function that evaluates if the sources are ready.
sourcesReadyFn SourcesReadyFn
}
// Add adds the specified source to the set of sources managed.
func (s *sourcesImpl) AddSource(source string) {
s.lock.Lock()
defer s.lock.Unlock()
s.sourcesSeen.Insert(source)
}
// AllReady returns true if each configured source is ready.
func (s *sourcesImpl) AllReady() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sourcesReadyFn(s.sourcesSeen)
}

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
@ -157,12 +158,10 @@ type SyncHandler interface {
HandlePodCleanups() error
}
type SourcesReadyFn func(sourcesSeen sets.String) bool
// Option is a functional option type for Kubelet
type Option func(*Kubelet)
// New instantiates a new Kubelet object along with all the required internal modules.
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(
hostname string,
@ -177,7 +176,7 @@ func NewMainKubelet(
eventQPS float32,
eventBurst int,
containerGCPolicy kubecontainer.ContainerGCPolicy,
sourcesReady SourcesReadyFn,
sourcesReadyFn config.SourcesReadyFn,
registerNode bool,
registerSchedulable bool,
standaloneMode bool,
@ -306,7 +305,7 @@ func NewMainKubelet(
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
sourcesReady: config.NewSourcesReady(sourcesReadyFn),
registerNode: registerNode,
registerSchedulable: registerSchedulable,
standaloneMode: standaloneMode,
@ -490,7 +489,6 @@ func NewMainKubelet(
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.sourcesSeen = sets.NewString()
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// apply functional Option's
@ -558,12 +556,8 @@ type Kubelet struct {
// pods on this node.
resyncInterval time.Duration
// sourcesReady is a function to call to determine if all config sources
// are ready.
sourcesReady SourcesReadyFn
// sourcesSeen records the sources seen by kubelet. This set is not thread
// safe and should only be access by the main kubelet syncloop goroutine.
sourcesSeen sets.String
// sourcesReady records the sources seen by the kubelet, it is thread-safe.
sourcesReady config.SourcesReady
// podManager is a facade that abstracts away the various sources of pods
// this Kubelet services.
@ -828,18 +822,6 @@ func (kl *Kubelet) validateNodeIP() error {
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", kl.nodeIP.String())
}
// allSourcesReady returns whether all seen pod sources are ready.
func (kl *Kubelet) allSourcesReady() bool {
// Make a copy of the sourcesSeen list because it's not thread-safe.
return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...))
}
// addSource adds a new pod source to the list of sources seen.
// TODO: reassess in the context of kubernetes/24810
func (kl *Kubelet) addSource(source string) {
kl.sourcesSeen.Insert(source)
}
// dirExists returns true if the path exists and represents a directory.
func dirExists(path string) bool {
s, err := os.Stat(path)
@ -2149,7 +2131,7 @@ func (kl *Kubelet) deletePod(pod *api.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
if !kl.allSourcesReady() {
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
@ -2496,7 +2478,7 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
kl.addSource(u.Source)
kl.sourcesReady.AddSource(u.Source)
switch u.Op {
case kubetypes.ADD:
@ -2553,8 +2535,7 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
handler.HandlePodSyncs([]*api.Pod{pod})
}
case <-housekeepingCh:
// It's time to do housekeeping
if !kl.allSourcesReady() {
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/client/testing/core"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -136,7 +137,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
}
kubelet.sourcesReady = func(_ sets.String) bool { return true }
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true })
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
@ -309,7 +310,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kubelet := testKubelet.kubelet
kubelet.sourcesReady = func(_ sets.String) bool { return ready }
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready })
fakeRuntime.PodList = []*kubecontainer.Pod{
{