Refactoring kubelet to separate object creation from object logical initialization.

This commit is contained in:
Vishnu kannan 2015-09-24 15:26:25 -07:00
parent 8761ad3ec1
commit cf56f7a8ef
3 changed files with 141 additions and 103 deletions

View File

@ -278,9 +278,7 @@ func NewMainKubelet(
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
serviceLister: serviceLister, serviceLister: serviceLister,
nodeLister: nodeLister, nodeLister: nodeLister,
runtimeMutex: sync.Mutex{}, runtimeState: newRuntimeState(maxWaitForContainerRuntime),
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
masterServiceNamespace: masterServiceNamespace, masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder, recorder: recorder,
@ -400,17 +398,11 @@ func NewMainKubelet(
} }
klet.containerManager = containerManager klet.containerManager = containerManager
go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop)
if klet.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop)
}
// Wait for the runtime to be up with a timeout. // Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err) return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)
} }
klet.lastTimestampRuntimeUp = time.Now() klet.runtimeState.setRuntimeSync(time.Now())
klet.runner = klet.containerRuntime klet.runner = klet.containerRuntime
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
@ -423,6 +415,10 @@ func NewMainKubelet(
containerRefManager, containerRefManager,
recorder) recorder)
if err := klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {
return nil, err
}
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil { if err != nil {
return nil, err return nil, err
@ -433,22 +429,6 @@ func NewMainKubelet(
// once we switch to using pod event generator. // once we switch to using pod event generator.
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval) klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval)
metrics.Register(runtimeCache)
if err = klet.setupDataDirs(); err != nil {
return nil, err
}
if err = klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {
return nil, err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
klet.backOff = util.NewBackOff(resyncInterval, MaxContainerBackOff) klet.backOff = util.NewBackOff(resyncInterval, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
@ -519,13 +499,8 @@ type Kubelet struct {
// Last timestamp when runtime responded on ping. // Last timestamp when runtime responded on ping.
// Mutex is used to protect this value. // Mutex is used to protect this value.
runtimeMutex sync.Mutex runtimeState *runtimeState
runtimeUpThreshold time.Duration
lastTimestampRuntimeUp time.Time
// Network Status information
networkConfigMutex sync.Mutex networkConfigMutex sync.Mutex
networkConfigured bool
// Volume plugins. // Volume plugins.
volumePluginMgr volume.VolumePluginMgr volumePluginMgr volume.VolumePluginMgr
@ -816,13 +791,17 @@ func (kl *Kubelet) StartGarbageCollection() {
}, 5*time.Minute, util.NeverStop) }, 5*time.Minute, util.NeverStop)
} }
// Run starts the kubelet reacting to config updates func (kl *Kubelet) preRun() error {
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { metrics.Register(kl.runtimeCache)
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) if err := kl.setupDataDirs(); err != nil {
return err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := kl.os.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
} }
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
} }
// Move Kubelet to a container. // Move Kubelet to a container.
@ -836,23 +815,41 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
} }
if err := kl.imageManager.Start(); err != nil { if err := kl.imageManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start ImageManager %v", err) return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
} }
if err := kl.cadvisor.Start(); err != nil { if err := kl.cadvisor.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start CAdvisor %v", err) return fmt.Errorf("Failed to start CAdvisor %v", err)
glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
} }
if err := kl.containerManager.Start(); err != nil { if err := kl.containerManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start ContainerManager %v", err) return fmt.Errorf("Failed to start ContainerManager %v", err)
glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
} }
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start OOM watcher %v", err) return fmt.Errorf("Failed to start OOM watcher %v", err)
glog.Errorf("Failed to start OOM watching: %v", err) }
return nil
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
if err := kl.preRun(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, err.Error())
glog.Error(err)
kl.runtimeState.setInitError(err)
}
go util.Until(kl.syncNetworkStatus, 30*time.Second, util.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, util.NeverStop)
} }
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
@ -2072,14 +2069,9 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
syncTicker := time.NewTicker(time.Second) syncTicker := time.NewTicker(time.Second)
housekeepingTicker := time.NewTicker(housekeepingPeriod) housekeepingTicker := time.NewTicker(housekeepingPeriod)
for { for {
if !kl.containerRuntimeUp() { if rs := kl.runtimeState.errors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
continue
}
if !kl.doneNetworkConfigure() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
continue continue
} }
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) { if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) {
@ -2375,10 +2367,8 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
func (kl *Kubelet) updateRuntimeUp() { func (kl *Kubelet) updateRuntimeUp() {
start := time.Now() start := time.Now()
err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond) err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond)
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
if err == nil { if err == nil {
kl.lastTimestampRuntimeUp = time.Now() kl.runtimeState.setRuntimeSync(time.Now())
} else { } else {
glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err) glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err)
} }
@ -2429,24 +2419,21 @@ func (kl *Kubelet) recordNodeStatusEvent(event string) {
var oldNodeUnschedulable bool var oldNodeUnschedulable bool
func (kl *Kubelet) syncNetworkStatus() { func (kl *Kubelet) syncNetworkStatus() {
kl.networkConfigMutex.Lock() var err error
defer kl.networkConfigMutex.Unlock()
networkConfigured := true
if kl.configureCBR0 { if kl.configureCBR0 {
if err := ensureIPTablesMasqRule(); err != nil { if err := ensureIPTablesMasqRule(); err != nil {
networkConfigured = false err = fmt.Errorf("Error on adding ip table rules: %v", err)
glog.Errorf("Error on adding ip table rules: %v", err) glog.Error(err)
} }
if len(kl.podCIDR) == 0 { if len(kl.podCIDR) == 0 {
glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now") err = fmt.Errorf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
networkConfigured = false glog.Warning(err)
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
networkConfigured = false err = fmt.Errorf("Error configuring cbr0: %v", err)
glog.Errorf("Error configuring cbr0: %v", err) glog.Error(err)
} }
} }
kl.networkConfigured = networkConfigured kl.runtimeState.setNetworkError(err)
} }
// setNodeStatus fills in the Status fields of the given Node, overwriting // setNodeStatus fills in the Status fields of the given Node, overwriting
@ -2556,17 +2543,13 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
node.Status.DaemonEndpoints = *kl.daemonEndpoints node.Status.DaemonEndpoints = *kl.daemonEndpoints
// Check whether container runtime can be reported as up. // FIXME: Check whether runtime version meets the minimal requirements
containerRuntimeUp := kl.containerRuntimeUp() _ = kl.containerRuntimeVersionRequirementMet()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
// Check whether runtime version meets the minimal requirements
containerRuntimeVersionRequirementMet := kl.containerRuntimeVersionRequirementMet()
currentTime := unversioned.Now() currentTime := unversioned.Now()
var newNodeReadyCondition api.NodeCondition var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured && containerRuntimeVersionRequirementMet { if rs := kl.runtimeState.errors(); len(rs) == 0 {
newNodeReadyCondition = api.NodeCondition{ newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionTrue, Status: api.ConditionTrue,
@ -2575,21 +2558,11 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
LastHeartbeatTime: currentTime, LastHeartbeatTime: currentTime,
} }
} else { } else {
var messages []string
if !containerRuntimeUp {
messages = append(messages, "container runtime is down")
}
if !networkConfigured {
messages = append(messages, "network not configured correctly")
}
if !containerRuntimeVersionRequirementMet {
messages = append(messages, fmt.Sprintf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion))
}
newNodeReadyCondition = api.NodeCondition{ newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFalse, Status: api.ConditionFalse,
Reason: "KubeletNotReady", Reason: "KubeletNotReady",
Message: strings.Join(messages, ","), Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime, LastHeartbeatTime: currentTime,
} }
} }
@ -2685,18 +2658,6 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
return nil return nil
} }
func (kl *Kubelet) containerRuntimeUp() bool {
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
}
func (kl *Kubelet) doneNetworkConfigure() bool {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
return kl.networkConfigured
}
func (kl *Kubelet) containerRuntimeVersionRequirementMet() bool { func (kl *Kubelet) containerRuntimeVersionRequirementMet() bool {
switch kl.GetRuntime().Type() { switch kl.GetRuntime().Type() {
case "docker": case "docker":

View File

@ -96,7 +96,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.hostname = testKubeletHostname kubelet.hostname = testKubeletHostname
kubelet.nodeName = testKubeletHostname kubelet.nodeName = testKubeletHostname
kubelet.runtimeUpThreshold = maxWaitForContainerRuntime kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err) t.Fatalf("can't make a temp rootdir: %v", err)
@ -140,7 +140,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.volumeManager = newVolumeManager() kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "") kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
kubelet.networkConfigured = true kubelet.runtimeState.setNetworkError(nil)
fakeClock := &util.FakeClock{Time: time.Now()} fakeClock := &util.FakeClock{Time: time.Now()}
kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock kubelet.backOff.Clock = fakeClock
@ -357,8 +357,10 @@ func TestSyncLoopAbort(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.lastTimestampRuntimeUp = time.Now() kubelet.runtimeState.setRuntimeSync(time.Now())
kubelet.networkConfigured = true // The syncLoop waits on time.After(resyncInterval), set it really big so that we don't race for
// the channel close
kubelet.resyncInterval = time.Second * 30
ch := make(chan kubetypes.PodUpdate) ch := make(chan kubetypes.PodUpdate)
close(ch) close(ch)
@ -2626,7 +2628,8 @@ func TestUpdateNewNodeStatus(t *testing.T) {
} }
} }
func TestDockerRuntimeVersion(t *testing.T) { // FIXME: Enable me..
func testDockerRuntimeVersion(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime fakeRuntime := testKubelet.fakeRuntime
@ -2994,7 +2997,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
}, },
} }
kubelet.runtimeUpThreshold = time.Duration(0) kubelet.runtimeState = newRuntimeState(time.Duration(0))
kubelet.updateRuntimeUp() kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil { if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)

74
pkg/kubelet/runtime.go Normal file
View File

@ -0,0 +1,74 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"time"
)
type runtimeState struct {
sync.Mutex
lastBaseRuntimeSync time.Time
baseRuntimeSyncThreshold time.Duration
networkError error
initError error
}
func (s *runtimeState) setRuntimeSync(t time.Time) {
s.Lock()
defer s.Unlock()
s.lastBaseRuntimeSync = t
}
func (s *runtimeState) setNetworkError(err error) {
if err == nil {
return
}
s.Lock()
defer s.Unlock()
s.networkError = err
}
func (s *runtimeState) setInitError(err error) {
s.Lock()
defer s.Unlock()
s.initError = err
}
func (s *runtimeState) errors() []string {
s.Lock()
defer s.Unlock()
var ret []string
if s.initError != nil {
ret = append(ret, s.initError.Error())
}
if s.networkError != nil {
ret = append(ret, s.networkError.Error())
}
if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
ret = append(ret, "container runtime is down")
}
return ret
}
func newRuntimeState(runtimeSyncThreshold time.Duration) *runtimeState {
return &runtimeState{
lastBaseRuntimeSync: time.Time{},
baseRuntimeSyncThreshold: runtimeSyncThreshold,
}
}