diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b3c04ade5c8..723046de242 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -27,6 +27,7 @@ import ( "path" "sort" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -219,6 +220,9 @@ func NewMainKubelet( clusterDNS: clusterDNS, serviceLister: serviceLister, nodeLister: nodeLister, + runtimeMutex: sync.Mutex{}, + runtimeUpThreshold: maxWaitForContainerRuntime, + lastTimestampRuntimeUp: time.Time{}, masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, recorder: recorder, @@ -269,6 +273,7 @@ func NewMainKubelet( if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err) } + klet.lastTimestampRuntimeUp = time.Now() klet.runner = klet.containerRuntime klet.podManager = newBasicPodManager(klet.kubeClient) @@ -345,6 +350,12 @@ type Kubelet struct { serviceLister serviceLister nodeLister nodeLister + // Last timestamp when runtime responsed on ping. + // Mutex is used to protect this value. + runtimeMutex sync.Mutex + runtimeUpThreshold time.Duration + lastTimestampRuntimeUp time.Time + // Volume plugins. volumePluginMgr volume.VolumePluginMgr @@ -606,6 +617,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err) } + go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) go kl.syncNodeStatus() // Run the system oom watcher forever. go util.Until(kl.runOOMWatcher, time.Second, util.NeverStop) @@ -1444,6 +1456,15 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { return kl.podManager.GetPodByName(namespace, name) } +func (kl *Kubelet) updateRuntimeUp() { + err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond) + kl.runtimeMutex.Lock() + defer kl.runtimeMutex.Unlock() + if err == nil { + kl.lastTimestampRuntimeUp = time.Now() + } +} + // updateNodeStatus updates node status to master with retries. func (kl *Kubelet) updateNodeStatus() error { for i := 0; i < nodeStatusUpdateRetry; i++ { @@ -1520,13 +1541,31 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { node.Status.NodeInfo.KubeProxyVersion = version.Get().String() } + // Check whether container runtime can be reported as up. + containerRuntimeUp := func() bool { + kl.runtimeMutex.Lock() + defer kl.runtimeMutex.Unlock() + return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now()) + }() + currentTime := util.Now() - newCondition := api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: currentTime, + var newCondition api.NodeCondition + if containerRuntimeUp { + newCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: currentTime, + } + } else { + newCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: fmt.Sprintf("container runtime is down"), + LastHeartbeatTime: currentTime, + } } + updated := false for i := range node.Status.Conditions { if node.Status.Conditions[i].Type == api.NodeReady { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 44b06c4f88a..7d6db7d293c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -72,6 +72,7 @@ const testKubeletHostname = "testnode" func newTestKubelet(t *testing.T) *TestKubelet { fakeDocker := &dockertools.FakeDockerClient{Errors: make(map[string]error), RemovedImages: util.StringSet{}} + fakeDocker.VersionInfo = []string{"ApiVersion=1.15"} fakeRecorder := &record.FakeRecorder{} fakeKubeClient := &testclient.Fake{} kubelet := &Kubelet{} @@ -80,6 +81,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.os = kubecontainer.FakeOS{} kubelet.hostname = "testnode" + kubelet.runtimeUpThreshold = maxWaitForContainerRuntime kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err) @@ -3238,6 +3240,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { }, } + kubelet.updateRuntimeUp() if err := kubelet.updateNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -3331,6 +3334,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { }, } + kubelet.updateRuntimeUp() if err := kubelet.updateNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -3356,6 +3360,90 @@ func TestUpdateExistingNodeStatus(t *testing.T) { } } +func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + fakeDocker := testKubelet.fakeDocker + // This causes returning an error from GetContainerRuntimeVersion() which + // simulates that container runtime is down. + fakeDocker.VersionInfo = []string{} + + kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "testnode"}}, + }}).ReactFn + mockCadvisor := testKubelet.fakeCadvisor + machineInfo := &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "testnode"}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: fmt.Sprintf("container runtime is down"), + LastHeartbeatTime: util.Time{}, + LastTransitionTime: util.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + }, + }, + } + + kubelet.runtimeUpThreshold = time.Duration(0) + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(kubeClient.Actions) != 2 || kubeClient.Actions[1].Action != "update-status-node" { + t.Fatalf("unexpected actions: %v", kubeClient.Actions) + } + updatedNode, ok := kubeClient.Actions[1].Value.(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + + if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) + } +} + func TestUpdateNodeStatusError(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet