Make cadvisor startup synchronous with container runtime initialization.

This is hopefully a temporary workaround.

Signed-off-by: Vishnu kannan <vishnuk@google.com>
This commit is contained in:
Vishnu kannan 2015-11-06 17:03:39 -08:00
parent 4ad3d6f5fe
commit b1770537ab
4 changed files with 69 additions and 28 deletions

View File

@ -31,7 +31,7 @@ var _ Interface = new(Mock)
func (c *Mock) Start() error { func (c *Mock) Start() error {
args := c.Called() args := c.Called()
return args.Error(1) return args.Error(0)
} }
// ContainerInfo is a mock implementation of Interface.ContainerInfo. // ContainerInfo is a mock implementation of Interface.ContainerInfo.

View File

@ -32,6 +32,7 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -143,7 +144,8 @@ func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error
return err return err
} }
// New creates a new Kubelet for use in main // New instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet( func NewMainKubelet(
hostname string, hostname string,
nodeName string, nodeName string,
@ -280,7 +282,6 @@ func NewMainKubelet(
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
serviceLister: serviceLister, serviceLister: serviceLister,
nodeLister: nodeLister, nodeLister: nodeLister,
runtimeState: newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR),
masterServiceNamespace: masterServiceNamespace, masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder, recorder: recorder,
@ -378,6 +379,8 @@ func NewMainKubelet(
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
} }
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible)
// setup containerGC // setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy) containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
if err != nil { if err != nil {
@ -614,6 +617,9 @@ type Kubelet struct {
// A queue used to trigger pod workers. // A queue used to trigger pod workers.
workQueue queue.WorkQueue workQueue queue.WorkQueue
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
} }
func (kl *Kubelet) allSourcesReady() bool { func (kl *Kubelet) allSourcesReady() bool {
@ -786,20 +792,25 @@ func (kl *Kubelet) StartGarbageCollection() {
}, 5*time.Minute, util.NeverStop) }, 5*time.Minute, util.NeverStop)
} }
func (kl *Kubelet) preRun() error { // initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error {
// Promethues metrics.
metrics.Register(kl.runtimeCache) metrics.Register(kl.runtimeCache)
// Step 1: Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil { if err := kl.setupDataDirs(); err != nil {
return err return err
} }
// If the container logs directory does not exist, create it.
// Step 2: If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil { if _, err := os.Stat(containerLogsDir); err != nil {
if err := kl.os.Mkdir(containerLogsDir, 0755); err != nil { if err := kl.os.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err) glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
} }
} }
// Move Kubelet to a container. // Step 3: Move Kubelet to a container, if required.
if kl.resourceContainer != "" { if kl.resourceContainer != "" {
// Fixme: I need to reside inside ContainerManager interface. // Fixme: I need to reside inside ContainerManager interface.
err := util.RunInResourceContainer(kl.resourceContainer) err := util.RunInResourceContainer(kl.resourceContainer)
@ -809,24 +820,30 @@ func (kl *Kubelet) preRun() error {
glog.Infof("Running in container %q", kl.resourceContainer) glog.Infof("Running in container %q", kl.resourceContainer)
} }
// Step 4: Start the image manager.
if err := kl.imageManager.Start(); err != nil { if err := kl.imageManager.Start(); err != nil {
return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err) return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
} }
if err := kl.cadvisor.Start(); err != nil { // Step 5: Start container manager.
return fmt.Errorf("Failed to start CAdvisor %v", err)
}
if err := kl.containerManager.Start(kl.nodeConfig); err != nil { if err := kl.containerManager.Start(kl.nodeConfig); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err) return fmt.Errorf("Failed to start ContainerManager %v", err)
} }
// Step 6: Start out of memory watcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("Failed to start OOM watcher %v", err) return fmt.Errorf("Failed to start OOM watcher %v", err)
} }
return nil return nil
} }
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
kl.runtimeState.setInternalError(fmt.Errorf("Failed to start cAdvisor %v", err))
}
}
// Run starts the kubelet reacting to config updates // Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil { if kl.logServer == nil {
@ -835,7 +852,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.kubeClient == nil { if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.") glog.Warning("No api server defined - no node status update will be sent.")
} }
if err := kl.preRun(); err != nil { if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, err.Error()) kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, err.Error())
glog.Error(err) glog.Error(err)
kl.runtimeState.setInitError(err) kl.runtimeState.setInitError(err)
@ -2362,6 +2379,8 @@ func (kl *Kubelet) updateRuntimeUp() {
start := time.Now() start := time.Now()
err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond) err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond)
if err == nil { if err == nil {
// Errors in initialization will be synchronized internally.
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(time.Now()) kl.runtimeState.setRuntimeSync(time.Now())
} else { } else {
glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err) glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err)
@ -2538,9 +2557,6 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
node.Status.DaemonEndpoints = *kl.daemonEndpoints node.Status.DaemonEndpoints = *kl.daemonEndpoints
// FIXME: Check whether runtime version meets the minimal requirements
_ = kl.containerRuntimeVersionRequirementMet()
currentTime := unversioned.Now() currentTime := unversioned.Now()
var newNodeReadyCondition api.NodeCondition var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus var oldNodeReadyConditionStatus api.ConditionStatus
@ -2653,27 +2669,24 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
return nil return nil
} }
func (kl *Kubelet) containerRuntimeVersionRequirementMet() bool { // FIXME: Why not combine this with container runtime health check?
func (kl *Kubelet) isContainerRuntimeVersionCompatible() error {
switch kl.GetRuntime().Type() { switch kl.GetRuntime().Type() {
case "docker": case "docker":
version, err := kl.GetContainerRuntimeVersion() version, err := kl.GetContainerRuntimeVersion()
if err != nil { if err != nil {
return true return nil
} }
// Verify the docker version. // Verify the docker version.
result, err := version.Compare(dockertools.MinimumDockerAPIVersion) result, err := version.Compare(dockertools.MinimumDockerAPIVersion)
if err != nil { if err != nil {
glog.Errorf("Cannot compare current docker version %v with minimum support Docker version %q", version, dockertools.MinimumDockerAPIVersion) return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, dockertools.MinimumDockerAPIVersion, err)
return false }
if result < 0 {
return fmt.Errorf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion)
} }
return (result >= 0)
case "rkt":
// TODO(dawnchen): Rkt support here
return true
default:
glog.Errorf("unsupported container runtime %s specified", kl.GetRuntime().Type())
return true
} }
return nil
} }
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0

View File

@ -97,7 +97,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.hostname = testKubeletHostname kubelet.hostname = testKubeletHostname
kubelet.nodeName = testKubeletHostname kubelet.nodeName = testKubeletHostname
kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "" /* Pod CIDR */) kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "" /* Pod CIDR */, func() error { return nil })
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err) t.Fatalf("can't make a temp rootdir: %v", err)
@ -2529,6 +2529,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
MemoryCapacity: 1024, MemoryCapacity: 1024,
} }
mockCadvisor := testKubelet.fakeCadvisor mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil) mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{ versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64", KernelVersion: "3.16.0-0.bpo.4-amd64",
@ -2802,6 +2803,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}, },
}}).ReactionChain }}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{ machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123", MachineID: "123",
SystemUUID: "abc", SystemUUID: "abc",
@ -2922,6 +2924,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain }}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{ machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123", MachineID: "123",
SystemUUID: "abc", SystemUUID: "abc",
@ -2996,7 +2999,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
}, },
}, },
} }
kubelet.runtimeState = newRuntimeState(time.Duration(0), false, "" /* Pod CIDR */) kubelet.runtimeState = newRuntimeState(time.Duration(0), false, "" /* Pod CIDR */, func() error { return nil })
kubelet.updateRuntimeUp() kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil { if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -3076,9 +3079,11 @@ func TestCreateMirrorPod(t *testing.T) {
func TestDeleteOutdatedMirrorPod(t *testing.T) { func TestDeleteOutdatedMirrorPod(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("Start").Return(nil)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl := testKubelet.kubelet kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorClient manager := testKubelet.fakeMirrorClient
pod := &api.Pod{ pod := &api.Pod{
@ -3129,9 +3134,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
func TestDeleteOrphanedMirrorPods(t *testing.T) { func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("Start").Return(nil)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl := testKubelet.kubelet kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorClient manager := testKubelet.fakeMirrorClient
orphanPods := []*api.Pod{ orphanPods := []*api.Pod{

View File

@ -27,8 +27,10 @@ type runtimeState struct {
lastBaseRuntimeSync time.Time lastBaseRuntimeSync time.Time
baseRuntimeSyncThreshold time.Duration baseRuntimeSyncThreshold time.Duration
networkError error networkError error
internalError error
cidr string cidr string
initError error initError error
runtimeCompatibility func() error
} }
func (s *runtimeState) setRuntimeSync(t time.Time) { func (s *runtimeState) setRuntimeSync(t time.Time) {
@ -37,6 +39,12 @@ func (s *runtimeState) setRuntimeSync(t time.Time) {
s.lastBaseRuntimeSync = t s.lastBaseRuntimeSync = t
} }
func (s *runtimeState) setInternalError(err error) {
s.Lock()
defer s.Unlock()
s.internalError = err
}
func (s *runtimeState) setNetworkState(err error) { func (s *runtimeState) setNetworkState(err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -74,10 +82,21 @@ func (s *runtimeState) errors() []string {
if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) { if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
ret = append(ret, "container runtime is down") ret = append(ret, "container runtime is down")
} }
if s.internalError != nil {
ret = append(ret, s.internalError.Error())
}
if err := s.runtimeCompatibility(); err != nil {
ret = append(ret, err.Error())
}
return ret return ret
} }
func newRuntimeState(runtimeSyncThreshold time.Duration, configureNetwork bool, cidr string) *runtimeState { func newRuntimeState(
runtimeSyncThreshold time.Duration,
configureNetwork bool,
cidr string,
runtimeCompatibility func() error,
) *runtimeState {
var networkError error = nil var networkError error = nil
if configureNetwork { if configureNetwork {
networkError = fmt.Errorf("network state unknown") networkError = fmt.Errorf("network state unknown")
@ -87,5 +106,7 @@ func newRuntimeState(runtimeSyncThreshold time.Duration, configureNetwork bool,
baseRuntimeSyncThreshold: runtimeSyncThreshold, baseRuntimeSyncThreshold: runtimeSyncThreshold,
networkError: networkError, networkError: networkError,
cidr: cidr, cidr: cidr,
internalError: nil,
runtimeCompatibility: runtimeCompatibility,
} }
} }