From 07b21c50facc9ca365fd1a3c31e737b83f8ee4de Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Thu, 3 Mar 2016 02:01:15 -0800 Subject: [PATCH] Add Status in runtime interface and use it for runtime health check. --- pkg/kubelet/container/runtime.go | 3 + pkg/kubelet/container/testing/fake_runtime.go | 10 + pkg/kubelet/container/testing/runtime_mock.go | 5 + pkg/kubelet/dockertools/fake_docker_client.go | 22 +- pkg/kubelet/dockertools/manager.go | 26 +- pkg/kubelet/dockertools/manager_test.go | 45 ++- pkg/kubelet/kubelet.go | 24 +- pkg/kubelet/kubelet_test.go | 316 +++++------------- pkg/kubelet/rkt/rkt.go | 14 +- pkg/kubelet/runtime.go | 6 - 10 files changed, 191 insertions(+), 280 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 33bea5bf5c4..fb8ddb24da9 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -57,7 +57,10 @@ type Runtime interface { Version() (Version, error) // APIVersion returns the API version information of the container // runtime. This may be different from the runtime engine's version. + // TODO(random-liu): We should fold this into Version() APIVersion() (Version, error) + // Status returns error if the runtime is unhealthy; nil otherwise. + Status() error // GetPods returns a list containers group by pods. The boolean parameter // specifies whether the runtime returns all containers including those already // exited and dead containers (used for garbage collection). diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index c01b15603d4..95d8ac0137a 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -48,6 +48,7 @@ type FakeRuntime struct { RuntimeType string Err error InspectErr error + StatusErr error } // FakeRuntime should implement Runtime. @@ -108,6 +109,7 @@ func (f *FakeRuntime) ClearCalls() { f.RuntimeType = "" f.Err = nil f.InspectErr = nil + f.StatusErr = nil } func (f *FakeRuntime) assertList(expect []string, test []string) error { @@ -168,6 +170,14 @@ func (f *FakeRuntime) APIVersion() (Version, error) { return &FakeVersion{Version: f.APIVersionInfo}, f.Err } +func (f *FakeRuntime) Status() error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "Status") + return f.StatusErr +} + func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index b99ba270d4b..13294886bef 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -53,6 +53,11 @@ func (r *Mock) APIVersion() (Version, error) { return args.Get(0).(Version), args.Error(1) } +func (r *Mock) Status() error { + args := r.Called() + return args.Error(0) +} + func (r *Mock) GetPods(all bool) ([]*Pod, error) { args := r.Called(all) return args.Get(0).([]*Pod), args.Error(1) diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index f67d2416d95..1a905cb90be 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -68,6 +68,26 @@ func NewFakeDockerClientWithVersion(version, apiVersion string) *FakeDockerClien } } +func (f *FakeDockerClient) InjectError(fn string, err error) { + f.Lock() + defer f.Unlock() + f.Errors[fn] = err +} + +func (f *FakeDockerClient) InjectErrors(errs map[string]error) { + f.Lock() + defer f.Unlock() + for fn, err := range errs { + f.Errors[fn] = err + } +} + +func (f *FakeDockerClient) ClearErrors() { + f.Lock() + defer f.Unlock() + f.Errors = map[string]error{} +} + func (f *FakeDockerClient) ClearCalls() { f.Lock() defer f.Unlock() @@ -382,7 +402,7 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A } func (f *FakeDockerClient) Version() (*docker.Env, error) { - return &f.VersionInfo, nil + return &f.VersionInfo, f.popError("version") } func (f *FakeDockerClient) Info() (*docker.Env, error) { diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 0929da9fc3a..eadf56842d1 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -60,7 +60,7 @@ import ( const ( DockerType = "docker" - MinimumDockerAPIVersion = "1.18" + minimumDockerAPIVersion = "1.18" // ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified) // we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative. @@ -941,6 +941,30 @@ func (dm *DockerManager) APIVersion() (kubecontainer.Version, error) { return dockerAPIVersion(version), nil } +// Status returns error if docker daemon is unhealthy, nil otherwise. +// Now we do this by checking whether: +// 1) `docker version` works +// 2) docker version is compatible with minimum requirement +func (dm *DockerManager) Status() error { + return dm.checkVersionCompatibility() +} + +func (dm *DockerManager) checkVersionCompatibility() error { + version, err := dm.APIVersion() + if err != nil { + return err + } + // Verify the docker version. + result, err := version.Compare(minimumDockerAPIVersion) + if err != nil { + return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, minimumDockerAPIVersion, err) + } + if result < 0 { + return fmt.Errorf("container runtime version is older than %s", minimumDockerAPIVersion) + } + return nil +} + // The first version of docker that supports exec natively is 1.3.0 == API 1.15 var dockerAPIVersionWithExec = "1.15" diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 50fd265c9a4..f6d28374d8c 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -546,7 +546,7 @@ func TestKillContainerInPodWithError(t *testing.T) { }, } fakeDocker.SetFakeRunningContainers(containers) - fakeDocker.Errors["stop"] = fmt.Errorf("sample error") + fakeDocker.InjectError("stop", fmt.Errorf("sample error")) if err := manager.KillContainerInPod(kubecontainer.ContainerID{}, &pod.Spec.Containers[0], pod, "test kill container with error."); err == nil { t.Errorf("expected error, found nil") @@ -1744,7 +1744,7 @@ func TestSyncPodWithFailure(t *testing.T) { ID: "9876", Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_0", }}) - fakeDocker.Errors = test.dockerError + fakeDocker.InjectErrors(test.dockerError) puller.ErrorsToInject = test.pullerError pod.Spec.Containers = []api.Container{test.container} result := runSyncPod(t, dm, fakeDocker, pod, nil, true) @@ -1865,3 +1865,44 @@ func TestSecurityOptsAreNilWithDockerV19(t *testing.T) { } assert.NotContains(t, newContainer.HostConfig.SecurityOpt, "seccomp:unconfined", "Pods with Docker versions < 1.10 must not have seccomp disabled by default") } + +func TestCheckVersionCompatibility(t *testing.T) { + apiVersion, err := docker.NewAPIVersion(minimumDockerAPIVersion) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + type test struct { + version string + compatible bool + } + tests := []test{ + // Minimum apiversion + {minimumDockerAPIVersion, true}, + // Invalid apiversion + {"invalid_api_version", false}, + } + for i := range apiVersion { + apiVersion[i]++ + // Newer apiversion + tests = append(tests, test{apiVersion.String(), true}) + apiVersion[i] -= 2 + // Older apiversion + if apiVersion[i] >= 0 { + tests = append(tests, test{apiVersion.String(), false}) + } + apiVersion[i]++ + } + + for i, tt := range tests { + testCase := fmt.Sprintf("test case #%d test version %q", i, tt.version) + dm, fakeDocker := newTestDockerManagerWithHTTPClientWithVersion(&fakeHTTP{}, "", tt.version) + err := dm.checkVersionCompatibility() + assert.Equal(t, tt.compatible, err == nil, testCase) + if tt.compatible == true { + // Get docker version error + fakeDocker.InjectError("version", fmt.Errorf("injected version error")) + err := dm.checkVersionCompatibility() + assert.NotNil(t, err, testCase+" version error check") + } + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 49a9b8458ef..fb7484c845d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -457,7 +457,7 @@ func NewMainKubelet( } klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{}) - klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible) + klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0) klet.updatePodCIDR(podCIDR) // setup containerGC @@ -2659,7 +2659,7 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { } func (kl *Kubelet) updateRuntimeUp() { - if _, err := kl.containerRuntime.Version(); err != nil { + if err := kl.containerRuntime.Status(); err != nil { glog.Errorf("Container runtime sanity check failed: %v", err) return } @@ -3076,26 +3076,6 @@ func SetNodeStatus(f func(*api.Node) error) Option { } } -// FIXME: Why not combine this with container runtime health check? -func (kl *Kubelet) isContainerRuntimeVersionCompatible() error { - switch kl.GetRuntime().Type() { - case "docker": - version, err := kl.GetRuntime().APIVersion() - if err != nil { - return nil - } - // Verify the docker version. - result, err := version.Compare(dockertools.MinimumDockerAPIVersion) - if err != nil { - return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, dockertools.MinimumDockerAPIVersion, err) - } - if result < 0 { - return fmt.Errorf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion) - } - } - return nil -} - // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 // is set, this function will also confirm that cbr0 is configured correctly. func (kl *Kubelet) tryUpdateNodeStatus() error { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index db0dbc04946..89310ce8d2d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -102,7 +102,8 @@ type TestKubelet struct { func newTestKubelet(t *testing.T) *TestKubelet { fakeRuntime := &containertest.FakeRuntime{} - fakeRuntime.VersionInfo = "1.15" + fakeRuntime.RuntimeType = "test" + fakeRuntime.VersionInfo = "1.5.0" fakeRuntime.ImageList = []kubecontainer.Image{ { ID: "abc", @@ -123,7 +124,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.hostname = testKubeletHostname kubelet.nodeName = testKubeletHostname - kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, func() error { return nil }) + kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false) kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil)) if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err) @@ -2654,9 +2655,6 @@ func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisortest.Mock, ro func TestUpdateNewNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.RuntimeType = "docker" - fakeRuntime.VersionInfo = "1.5.0" kubeClient := testKubelet.fakeKubeClient kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, @@ -2710,7 +2708,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { BootID: "1b3", KernelVersion: "3.16.0-0.bpo.4-amd64", OSImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", + ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, @@ -2852,197 +2850,9 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { } } -func TestDockerRuntimeVersion(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.RuntimeType = "docker" - fakeRuntime.VersionInfo = "1.10.0-rc1-fc24" - fakeRuntime.APIVersionInfo = "1.22" - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeOutOfDisk, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: fmt.Sprintf("kubelet has sufficient disk space available"), - LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Allocatable: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - }, - }, - }}).ReactionChain - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("Start").Return(nil) - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 20E9, - } - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - // Make kubelet report that it has sufficient disk space. - if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { - t.Fatalf("can't update disk space manager: %v", err) - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeOutOfDisk, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: fmt.Sprintf("kubelet has sufficient disk space available"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OSImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.10.0-rc1-fc24", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Allocatable: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - Images: []api.ContainerImage{ - { - Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"}, - SizeBytes: 123, - }, - { - Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"}, - SizeBytes: 456, - }, - }, - }, - } - - kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, kubelet.isContainerRuntimeVersionCompatible) - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - for i, cond := range updatedNode.Status.Conditions { - if cond.LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if cond.LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} - } - - // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 - if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { - t.Errorf("unexpected node condition order. NodeReady should be last.") - } - - if !api.Semantic.DeepEqual(expectedNode, updatedNode) { - t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) - } - - // Downgrade docker version, node should be NotReady - fakeRuntime.RuntimeType = "docker" - fakeRuntime.VersionInfo = "1.5.0" - fakeRuntime.APIVersionInfo = "1.17" - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions = kubeClient.Actions() - if len(actions) != 4 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok = actions[3].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - if updatedNode.Status.Conditions[1].Reason != "KubeletNotReady" && - !strings.Contains(updatedNode.Status.Conditions[1].Message, "container runtime version is older than") { - t.Errorf("unexpect NodeStatus due to container runtime version") - } -} - func TestUpdateExistingNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.RuntimeType = "docker" - fakeRuntime.VersionInfo = "1.5.0" kubeClient := testKubelet.fakeKubeClient kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ { @@ -3129,7 +2939,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { BootID: "1b3", KernelVersion: "3.16.0-0.bpo.4-amd64", OSImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", + ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, @@ -3350,13 +3160,11 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) } } -func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { +func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet + clock := testKubelet.fakeClock kubeClient := testKubelet.fakeKubeClient - fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.RuntimeType = "docker" - fakeRuntime.VersionInfo = "1.5.0" kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, }}).ReactionChain @@ -3394,14 +3202,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, LastTransitionTime: unversioned.Time{}, }, - { - Type: api.NodeReady, - Status: api.ConditionFalse, - Reason: "KubeletNotReady", - Message: fmt.Sprintf("container runtime is down"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, + {}, //placeholder }, NodeInfo: api.NodeSystemInfo{ MachineID: "123", @@ -3409,7 +3210,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { BootID: "1b3", KernelVersion: "3.16.0-0.bpo.4-amd64", OSImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", + ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, @@ -3439,42 +3240,77 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { }, }, } - kubelet.runtimeState = newRuntimeState(time.Duration(0), false, func() error { return nil }) + + checkNodeStatus := func(status api.ConditionStatus, reason, message string) { + kubeClient.ClearActions() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + + for i, cond := range updatedNode.Status.Conditions { + if cond.LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if cond.LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} + } + + // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 + if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { + t.Errorf("unexpected node condition order. NodeReady should be last.") + } + expectedNode.Status.Conditions[1] = api.NodeCondition{ + Type: api.NodeReady, + Status: status, + Reason: reason, + Message: message, + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + } + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) + } + } + + readyMessage := "kubelet is posting ready status" + downMessage := "container runtime is down" + + // Should report kubelet not ready if the runtime check is out of date + clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } + checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) - for i, cond := range updatedNode.Status.Conditions { - if cond.LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if cond.LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} - } + // Should report kubelet ready if the runtime check is updated + clock.SetTime(time.Now()) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionTrue, "KubeletReady", readyMessage) - // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 - if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { - t.Errorf("unexpected node condition order. NodeReady should be last.") - } + // Should report kubelet not ready if the runtime check is out of date + clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) - if !api.Semantic.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) - } + // Should report kubelet not ready if the runtime check failed + fakeRuntime := testKubelet.fakeRuntime + // Inject error into fake runtime status check, node should be NotReady + fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error") + clock.SetTime(time.Now()) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) } func TestUpdateNodeStatusError(t *testing.T) { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 87059ef7dfb..9ce77854610 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -184,13 +184,6 @@ func New(config *Config, rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) } - if err := rkt.checkVersion(minimumRktBinVersion, recommendedRktBinVersion, minimumAppcVersion, minimumRktApiVersion, minimumSystemdVersion); err != nil { - // TODO(yifan): Latest go-systemd version have the ability to close the - // dbus connection. However the 'docker/libcontainer' package is using - // the older go-systemd version, so we can't update the go-systemd version. - rkt.apisvcConn.Close() - return nil, err - } return rkt, nil } @@ -1062,7 +1055,12 @@ func (r *Runtime) Version() (kubecontainer.Version, error) { } func (r *Runtime) APIVersion() (kubecontainer.Version, error) { - return r.binVersion, nil + return r.apiVersion, nil +} + +// Status returns error if rkt is unhealthy, nil otherwise. +func (r *Runtime) Status() error { + return r.checkVersion(minimumRktBinVersion, recommendedRktBinVersion, minimumAppcVersion, minimumRktApiVersion, minimumSystemdVersion) } // SyncPod syncs the running pod to match the specified desired pod. diff --git a/pkg/kubelet/runtime.go b/pkg/kubelet/runtime.go index 6e3c413ad41..9b26f11166e 100644 --- a/pkg/kubelet/runtime.go +++ b/pkg/kubelet/runtime.go @@ -30,7 +30,6 @@ type runtimeState struct { internalError error cidr string initError error - runtimeCompatibility func() error } func (s *runtimeState) setRuntimeSync(t time.Time) { @@ -85,16 +84,12 @@ func (s *runtimeState) errors() []string { if s.internalError != nil { ret = append(ret, s.internalError.Error()) } - if err := s.runtimeCompatibility(); err != nil { - ret = append(ret, err.Error()) - } return ret } func newRuntimeState( runtimeSyncThreshold time.Duration, configureNetwork bool, - runtimeCompatibility func() error, ) *runtimeState { var networkError error = nil if configureNetwork { @@ -105,6 +100,5 @@ func newRuntimeState( baseRuntimeSyncThreshold: runtimeSyncThreshold, networkError: networkError, internalError: nil, - runtimeCompatibility: runtimeCompatibility, } }