diff --git a/pkg/kubelet/cadvisor/cadvisor_mock.go b/pkg/kubelet/cadvisor/cadvisor_mock.go index c698c6fb7ef..4cdbe05a01d 100644 --- a/pkg/kubelet/cadvisor/cadvisor_mock.go +++ b/pkg/kubelet/cadvisor/cadvisor_mock.go @@ -31,7 +31,7 @@ var _ Interface = new(Mock) func (c *Mock) Start() error { args := c.Called() - return args.Error(1) + return args.Error(0) } // ContainerInfo is a mock implementation of Interface.ContainerInfo. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bdee7978e16..a6f263772a0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -32,6 +32,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/golang/glog" @@ -143,7 +144,8 @@ func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error return err } -// New creates a new Kubelet for use in main +// New instantiates a new Kubelet object along with all the required internal modules. +// No initialization of Kubelet and its modules should happen here. func NewMainKubelet( hostname string, nodeName string, @@ -280,7 +282,6 @@ func NewMainKubelet( clusterDNS: clusterDNS, serviceLister: serviceLister, nodeLister: nodeLister, - runtimeState: newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR), masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, recorder: recorder, @@ -378,6 +379,8 @@ func NewMainKubelet( return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) } + klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible) + // setup containerGC containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy) if err != nil { @@ -614,6 +617,9 @@ type Kubelet struct { // A queue used to trigger pod workers. workQueue queue.WorkQueue + + // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up. + oneTimeInitializer sync.Once } func (kl *Kubelet) allSourcesReady() bool { @@ -786,20 +792,25 @@ func (kl *Kubelet) StartGarbageCollection() { }, 5*time.Minute, util.NeverStop) } -func (kl *Kubelet) preRun() error { +// initializeModules will initialize internal modules that do not require the container runtime to be up. +// Note that the modules here must not depend on modules that are not initialized here. +func (kl *Kubelet) initializeModules() error { + // Promethues metrics. metrics.Register(kl.runtimeCache) + // Step 1: Setup filesystem directories. if err := kl.setupDataDirs(); err != nil { return err } - // If the container logs directory does not exist, create it. + + // Step 2: If the container logs directory does not exist, create it. if _, err := os.Stat(containerLogsDir); err != nil { if err := kl.os.Mkdir(containerLogsDir, 0755); err != nil { glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err) } } - // Move Kubelet to a container. + // Step 3: Move Kubelet to a container, if required. if kl.resourceContainer != "" { // Fixme: I need to reside inside ContainerManager interface. err := util.RunInResourceContainer(kl.resourceContainer) @@ -809,24 +820,30 @@ func (kl *Kubelet) preRun() error { glog.Infof("Running in container %q", kl.resourceContainer) } + // Step 4: Start the image manager. if err := kl.imageManager.Start(); err != nil { return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err) } - if err := kl.cadvisor.Start(); err != nil { - return fmt.Errorf("Failed to start CAdvisor %v", err) - } - + // Step 5: Start container manager. if err := kl.containerManager.Start(kl.nodeConfig); err != nil { return fmt.Errorf("Failed to start ContainerManager %v", err) } + // Step 6: Start out of memory watcher. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { return fmt.Errorf("Failed to start OOM watcher %v", err) } return nil } +// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up. +func (kl *Kubelet) initializeRuntimeDependentModules() { + if err := kl.cadvisor.Start(); err != nil { + kl.runtimeState.setInternalError(fmt.Errorf("Failed to start cAdvisor %v", err)) + } +} + // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { if kl.logServer == nil { @@ -835,7 +852,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") } - if err := kl.preRun(); err != nil { + if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, err.Error()) glog.Error(err) kl.runtimeState.setInitError(err) @@ -2362,6 +2379,8 @@ func (kl *Kubelet) updateRuntimeUp() { start := time.Now() err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond) if err == nil { + // Errors in initialization will be synchronized internally. + kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) kl.runtimeState.setRuntimeSync(time.Now()) } else { glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err) @@ -2538,9 +2557,6 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { node.Status.DaemonEndpoints = *kl.daemonEndpoints - // FIXME: Check whether runtime version meets the minimal requirements - _ = kl.containerRuntimeVersionRequirementMet() - currentTime := unversioned.Now() var newNodeReadyCondition api.NodeCondition var oldNodeReadyConditionStatus api.ConditionStatus @@ -2653,27 +2669,24 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { return nil } -func (kl *Kubelet) containerRuntimeVersionRequirementMet() bool { +// FIXME: Why not combine this with container runtime health check? +func (kl *Kubelet) isContainerRuntimeVersionCompatible() error { switch kl.GetRuntime().Type() { case "docker": version, err := kl.GetContainerRuntimeVersion() if err != nil { - return true + return nil } // Verify the docker version. result, err := version.Compare(dockertools.MinimumDockerAPIVersion) if err != nil { - glog.Errorf("Cannot compare current docker version %v with minimum support Docker version %q", version, dockertools.MinimumDockerAPIVersion) - return false + return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, dockertools.MinimumDockerAPIVersion, err) + } + if result < 0 { + return fmt.Errorf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion) } - return (result >= 0) - case "rkt": - // TODO(dawnchen): Rkt support here - return true - default: - glog.Errorf("unsupported container runtime %s specified", kl.GetRuntime().Type()) - return true } + return nil } // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 09426a153ed..56e271aa87a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -97,7 +97,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.hostname = testKubeletHostname kubelet.nodeName = testKubeletHostname - kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "" /* Pod CIDR */) + kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "" /* Pod CIDR */, func() error { return nil }) kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err) @@ -2529,6 +2529,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { MemoryCapacity: 1024, } mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) mockCadvisor.On("MachineInfo").Return(machineInfo, nil) versionInfo := &cadvisorapi.VersionInfo{ KernelVersion: "3.16.0-0.bpo.4-amd64", @@ -2802,6 +2803,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { }, }}).ReactionChain mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -2922,6 +2924,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, }}).ReactionChain mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -2996,7 +2999,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { }, }, } - kubelet.runtimeState = newRuntimeState(time.Duration(0), false, "" /* Pod CIDR */) + kubelet.runtimeState = newRuntimeState(time.Duration(0), false, "" /* Pod CIDR */, func() error { return nil }) kubelet.updateRuntimeUp() if err := kubelet.updateNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -3076,9 +3079,11 @@ func TestCreateMirrorPod(t *testing.T) { func TestDeleteOutdatedMirrorPod(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("Start").Return(nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + kl := testKubelet.kubelet manager := testKubelet.fakeMirrorClient pod := &api.Pod{ @@ -3129,9 +3134,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { func TestDeleteOrphanedMirrorPods(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("Start").Return(nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + kl := testKubelet.kubelet manager := testKubelet.fakeMirrorClient orphanPods := []*api.Pod{ diff --git a/pkg/kubelet/runtime.go b/pkg/kubelet/runtime.go index bf574f923e4..e97e8245780 100644 --- a/pkg/kubelet/runtime.go +++ b/pkg/kubelet/runtime.go @@ -27,8 +27,10 @@ type runtimeState struct { lastBaseRuntimeSync time.Time baseRuntimeSyncThreshold time.Duration networkError error + internalError error cidr string initError error + runtimeCompatibility func() error } func (s *runtimeState) setRuntimeSync(t time.Time) { @@ -37,6 +39,12 @@ func (s *runtimeState) setRuntimeSync(t time.Time) { s.lastBaseRuntimeSync = t } +func (s *runtimeState) setInternalError(err error) { + s.Lock() + defer s.Unlock() + s.internalError = err +} + func (s *runtimeState) setNetworkState(err error) { s.Lock() defer s.Unlock() @@ -74,10 +82,21 @@ func (s *runtimeState) errors() []string { if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) { ret = append(ret, "container runtime is down") } + if s.internalError != nil { + ret = append(ret, s.internalError.Error()) + } + if err := s.runtimeCompatibility(); err != nil { + ret = append(ret, err.Error()) + } return ret } -func newRuntimeState(runtimeSyncThreshold time.Duration, configureNetwork bool, cidr string) *runtimeState { +func newRuntimeState( + runtimeSyncThreshold time.Duration, + configureNetwork bool, + cidr string, + runtimeCompatibility func() error, +) *runtimeState { var networkError error = nil if configureNetwork { networkError = fmt.Errorf("network state unknown") @@ -87,5 +106,7 @@ func newRuntimeState(runtimeSyncThreshold time.Duration, configureNetwork bool, baseRuntimeSyncThreshold: runtimeSyncThreshold, networkError: networkError, cidr: cidr, + internalError: nil, + runtimeCompatibility: runtimeCompatibility, } }