Merge pull request #52176 from liggitt/heartbeat-timeout

Automatic merge from submit-queue (batch tested with PRs 52176, 43152). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>..

Eliminate hangs/throttling of node heartbeat

Fixes https://github.com/kubernetes/kubernetes/issues/48638
Fixes #50304

Stops kubelet from wedging when updating node status if unable to establish tcp connection.

 Notes that this only affects the node status loop. The pod sync loop would still hang until the dead TCP connections timed out,  so more work is needed to keep the sync loop responsive in the face of network issues, but this change lets existing pods coast without the node controller trying to evict them

```release-note
kubelet to master communication when doing node status updates now has a timeout to prevent indefinite hangs
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-16 09:45:29 -07:00 committed by GitHub
commit 3277de69b4
10 changed files with 102 additions and 16 deletions

View File

@ -158,6 +158,7 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err
ContainerManager: nil,
DockerClient: dockerClient,
KubeClient: nil,
HeartbeatClient: nil,
ExternalKubeClient: nil,
EventClient: nil,
Mounter: mounter,
@ -319,11 +320,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
kubeDeps.KubeClient = nil
kubeDeps.ExternalKubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
glog.Warningf("standalone mode, no API client")
} else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil {
} else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil {
// initialize clients if not standalone mode and any of the clients are not provided
var kubeClient clientset.Interface
var eventClient v1core.EventsGetter
var heartbeatClient v1core.CoreV1Interface
var externalKubeClient clientgoclientset.Interface
clientConfig, err := CreateAPIServerClientConfig(s)
@ -352,16 +355,24 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
if err != nil {
glog.Warningf("New kubeClient from clientConfig error: %v", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
tmpClient, err := clientgoclientset.NewForConfig(&eventClientConfig)
eventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for Events: %v", err)
}
eventClient = tmpClient.CoreV1()
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
heartbeatClientConfig.QPS = float32(-1)
heartbeatClient, err = v1core.NewForConfig(&heartbeatClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
}
} else {
switch {
case s.RequireKubeConfig:
@ -373,6 +384,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
kubeDeps.KubeClient = kubeClient
kubeDeps.ExternalKubeClient = externalKubeClient
if heartbeatClient != nil {
kubeDeps.HeartbeatClient = heartbeatClient
}
if eventClient != nil {
kubeDeps.EventClient = eventClient
}

View File

@ -183,7 +183,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) {
return
}
_, err = nodeutil.PatchNodeStatus(cnc.kubeClient, types.NodeName(node.Name), node, newNode)
_, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
if err != nil {
glog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
}

View File

@ -230,6 +230,8 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",

View File

@ -237,6 +237,7 @@ type Dependencies struct {
ContainerManager cm.ContainerManager
DockerClient libdocker.Interface
EventClient v1core.EventsGetter
HeartbeatClient v1core.CoreV1Interface
KubeClient clientset.Interface
ExternalKubeClient clientgoclientset.Interface
Mounter mount.Interface
@ -452,6 +453,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
hostname: hostname,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
@ -885,12 +887,13 @@ type serviceLister interface {
type Kubelet struct {
kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
hostname string
nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
iptClient utilipt.Interface
rootDirectory string
hostname string
nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
heartbeatClient v1core.CoreV1Interface
iptClient utilipt.Interface
rootDirectory string
// podWorkers handle syncing Pods in response to events.
podWorkers PodWorkers

View File

@ -139,7 +139,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
if requiresUpdate {
if _, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName),
if _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName),
originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false
@ -367,7 +367,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
if kl.kubeClient == nil || kl.heartbeatClient == nil {
return
}
if kl.registerNode {
@ -404,7 +404,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
node, err := kl.kubeClient.Core().Nodes().Get(string(kl.nodeName), opts)
node, err := kl.heartbeatClient.Nodes().Get(string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
@ -423,7 +423,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
kl.setNodeStatus(node)
// Patch the current status on the API server
updatedNode, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), originalNode, node)
updatedNode, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}

View File

@ -23,6 +23,7 @@ import (
goruntime "runtime"
"sort"
"strconv"
"sync/atomic"
"testing"
"time"
@ -43,6 +44,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -133,6 +136,7 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.hostname = testKubeletHostname
existingNode := v1.Node{
@ -215,6 +219,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
t, inputImageList, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
@ -339,6 +344,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
@ -525,10 +531,64 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
}
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
attempts := int64(0)
// set up a listener that hangs connections
ln, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
defer ln.Close()
go func() {
// accept connections and just let them hang
for {
_, err := ln.Accept()
if err != nil {
t.Log(err)
return
}
t.Log("accepted connection")
atomic.AddInt64(&attempts, 1)
}
}()
config := &rest.Config{
Host: "http://" + ln.Addr().String(),
QPS: -1,
Timeout: time.Second,
}
assert.NoError(t, err)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
// should return an error, but not hang
assert.Error(t, kubelet.updateNodeStatus())
// should have attempted multiple times
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts != nodeStatusUpdateRetry {
t.Errorf("Expected %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
}
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
@ -737,6 +797,7 @@ func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain
assert.Error(t, kubelet.updateNodeStatus())
@ -999,6 +1060,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
t, inputImageList, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
@ -1059,6 +1121,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
func TestUpdateDefaultLabels(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
cases := []struct {
name string

View File

@ -159,6 +159,7 @@ func newTestKubeletWithImageList(
kubelet := &Kubelet{}
kubelet.recorder = fakeRecorder
kubelet.kubeClient = fakeKubeClient
kubelet.heartbeatClient = fakeKubeClient.CoreV1()
kubelet.os = &containertest.FakeOS{}
kubelet.mounter = &mount.FakeMounter{}

View File

@ -67,6 +67,7 @@ func NewHollowKubelet(
volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...)
d := &kubelet.Dependencies{
KubeClient: client,
HeartbeatClient: client.CoreV1(),
DockerClient: dockerClient,
CAdvisorInterface: cadvisorInterface,
Cloud: nil,

View File

@ -18,6 +18,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
],
)

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/api"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
)
@ -150,7 +151,7 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N
}
// PatchNodeStatus patches node status.
func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) {
func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) {
oldData, err := json.Marshal(oldNode)
if err != nil {
return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
@ -171,7 +172,7 @@ func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1
return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
}
updatedNode, err := c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}