Set NodeReady=False when docker is dead

This commit is contained in:
Wojciech Tyczynski 2015-05-05 12:19:54 +02:00
parent 7882d1eeb2
commit e26da316dc
2 changed files with 132 additions and 5 deletions

View File

@ -27,6 +27,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -219,6 +220,9 @@ func NewMainKubelet(
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
runtimeMutex: sync.Mutex{},
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
@ -269,6 +273,7 @@ func NewMainKubelet(
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)
}
klet.lastTimestampRuntimeUp = time.Now()
klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient)
@ -345,6 +350,12 @@ type Kubelet struct {
serviceLister serviceLister
nodeLister nodeLister
// Last timestamp when runtime responsed on ping.
// Mutex is used to protect this value.
runtimeMutex sync.Mutex
runtimeUpThreshold time.Duration
lastTimestampRuntimeUp time.Time
// Volume plugins.
volumePluginMgr volume.VolumePluginMgr
@ -606,6 +617,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
go kl.syncNodeStatus()
// Run the system oom watcher forever.
go util.Until(kl.runOOMWatcher, time.Second, util.NeverStop)
@ -1444,6 +1456,15 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
return kl.podManager.GetPodByName(namespace, name)
}
func (kl *Kubelet) updateRuntimeUp() {
err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond)
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
if err == nil {
kl.lastTimestampRuntimeUp = time.Now()
}
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
@ -1520,13 +1541,31 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
// Check whether container runtime can be reported as up.
containerRuntimeUp := func() bool {
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
}()
currentTime := util.Now()
newCondition := api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: currentTime,
var newCondition api.NodeCondition
if containerRuntimeUp {
newCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: currentTime,
}
} else {
newCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: fmt.Sprintf("container runtime is down"),
LastHeartbeatTime: currentTime,
}
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {

View File

@ -72,6 +72,7 @@ const testKubeletHostname = "testnode"
func newTestKubelet(t *testing.T) *TestKubelet {
fakeDocker := &dockertools.FakeDockerClient{Errors: make(map[string]error), RemovedImages: util.StringSet{}}
fakeDocker.VersionInfo = []string{"ApiVersion=1.15"}
fakeRecorder := &record.FakeRecorder{}
fakeKubeClient := &testclient.Fake{}
kubelet := &Kubelet{}
@ -80,6 +81,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.os = kubecontainer.FakeOS{}
kubelet.hostname = "testnode"
kubelet.runtimeUpThreshold = maxWaitForContainerRuntime
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
@ -3238,6 +3240,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3331,6 +3334,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3356,6 +3360,90 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
}
func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
fakeDocker := testKubelet.fakeDocker
// This causes returning an error from GetContainerRuntimeVersion() which
// simulates that container runtime is down.
fakeDocker.VersionInfo = []string{}
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "testnode"}},
}}).ReactFn
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: fmt.Sprintf("container runtime is down"),
LastHeartbeatTime: util.Time{},
LastTransitionTime: util.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
},
},
}
kubelet.runtimeUpThreshold = time.Duration(0)
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(kubeClient.Actions) != 2 || kubeClient.Actions[1].Action != "update-status-node" {
t.Fatalf("unexpected actions: %v", kubeClient.Actions)
}
updatedNode, ok := kubeClient.Actions[1].Value.(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet