From 9b1c4c7b57f7fbdd776f5103c89ed1f461c295d0 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Fri, 15 Jul 2022 01:21:39 +0200 Subject: [PATCH] Implement KEP-3836 TL;DR: we want to start failing the LB HC if a node is tainted with ToBeDeletedByClusterAutoscaler. This field might need refinement, but currently is deemed our best way of understanding if a node is about to get deleted. We want to do this only for eTP:Cluster services. The goal is to connection draining terminating nodes --- cmd/kube-proxy/app/server.go | 6 ++ pkg/features/kube_features.go | 10 ++ pkg/proxy/healthcheck/healthcheck_test.go | 110 ++++++++++++++++++++-- pkg/proxy/healthcheck/proxier_health.go | 62 +++++++++++- pkg/proxy/iptables/proxier.go | 1 + pkg/proxy/ipvs/proxier.go | 1 + pkg/proxy/node.go | 21 +++++ 7 files changed, 200 insertions(+), 11 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 7fe01c87769..8dc8e4ce403 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -27,6 +27,7 @@ import ( "strings" "time" + "k8s.io/kubernetes/pkg/features" utilnode "k8s.io/kubernetes/pkg/util/node" "github.com/fsnotify/fsnotify" @@ -869,6 +870,11 @@ func (s *ProxyServer) Run() error { if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR { nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs)) } + if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) { + nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{ + HealthServer: s.HealthzServer, + }) + } nodeConfig.RegisterEventHandler(s.Proxier) go nodeConfig.Run(wait.NeverStop) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 7bb9cba5b20..c2f96425f60 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -449,6 +449,14 @@ const ( // Add support for distributed tracing in the kubelet KubeletTracing featuregate.Feature = "KubeletTracing" + // owner: @alexanderConstantinescu + // kep: http://kep.k8s.io/3836 + // alpha: v1.28 + // + // Implement connection draining for terminating nodes for + // `externalTrafficPolicy: Cluster` services. + KubeProxyDrainingTerminatingNodes featuregate.Feature = "KubeProxyDrainingTerminatingNodes" + // owner: @zshihang // kep: https://kep.k8s.io/2800 // beta: v1.24 @@ -976,6 +984,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS KubeletTracing: {Default: true, PreRelease: featuregate.Beta}, + KubeProxyDrainingTerminatingNodes: {Default: false, PreRelease: featuregate.Alpha}, + LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 LegacyServiceAccountTokenTracking: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.30 diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index 6b4a0468711..3d8edb63f12 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -27,6 +27,8 @@ import ( "github.com/google/go-cmp/cmp" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/dump" "k8s.io/apimachinery/pkg/util/sets" @@ -131,6 +133,7 @@ type hcPayload struct { type healthzPayload struct { LastUpdated string CurrentTime string + NodeHealthy bool } type fakeProxierHealthChecker struct { @@ -427,6 +430,30 @@ func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, } } +type nodeTweak func(n *v1.Node) + +func makeNode(tweaks ...nodeTweak) *v1.Node { + n := &v1.Node{} + for _, tw := range tweaks { + tw(n) + } + return n +} + +func tweakDeleted() nodeTweak { + return func(n *v1.Node) { + n.DeletionTimestamp = &metav1.Time{ + Time: time.Now(), + } + } +} + +func tweakTainted(key string) nodeTweak { + return func(n *v1.Node) { + n.Spec.Taints = append(n.Spec.Taints, v1.Taint{Key: key}) + } +} + func TestHealthzServer(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() @@ -436,30 +463,99 @@ func TestHealthzServer(t *testing.T) { server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) // Should return 200 "OK" by default. - testHealthzHandler(server, http.StatusOK, t) + testHTTPHandler(server, healthzURL, http.StatusOK, t) // Should return 200 "OK" after first update hs.Updated() - testHealthzHandler(server, http.StatusOK, t) + testHTTPHandler(server, healthzURL, http.StatusOK, t) // Should continue to return 200 "OK" as long as no further updates are queued fakeClock.Step(25 * time.Second) - testHealthzHandler(server, http.StatusOK, t) + testHTTPHandler(server, healthzURL, http.StatusOK, t) // Should return 503 "ServiceUnavailable" if exceed max update-processing time hs.QueuedUpdate() fakeClock.Step(25 * time.Second) - testHealthzHandler(server, http.StatusServiceUnavailable, t) + testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t) // Should return 200 "OK" after processing update hs.Updated() fakeClock.Step(5 * time.Second) - testHealthzHandler(server, http.StatusOK, t) + testHTTPHandler(server, healthzURL, http.StatusOK, t) + + // Should return 200 "OK" if we've synced a node, tainted in any other way + hs.SyncNode(makeNode(tweakTainted("other"))) + testHTTPHandler(server, healthzURL, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if we've synced a ToBeDeletedTaint node + hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint))) + testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" if we've synced a node, tainted in any other way + hs.SyncNode(makeNode(tweakTainted("other"))) + testHTTPHandler(server, healthzURL, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if we've synced a deleted node + hs.SyncNode(makeNode(tweakDeleted())) + testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t) } -func testHealthzHandler(server httpServer, status int, t *testing.T) { +func TestLivezServer(t *testing.T) { + listener := newFakeListener() + httpFactory := newFakeHTTPServerFactory() + fakeClock := testingclock.NewFakeClock(time.Now()) + + hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) + server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs}) + + // Should return 200 "OK" by default. + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should return 200 "OK" after first update + hs.Updated() + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should continue to return 200 "OK" as long as no further updates are queued + fakeClock.Step(25 * time.Second) + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if exceed max update-processing time + hs.QueuedUpdate() + fakeClock.Step(25 * time.Second) + testHTTPHandler(server, livezURL, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update + hs.Updated() + fakeClock.Step(5 * time.Second) + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should return 200 "OK" irrespective of node syncs + hs.SyncNode(makeNode(tweakTainted("other"))) + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should return 200 "OK" irrespective of node syncs + hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint))) + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should return 200 "OK" irrespective of node syncs + hs.SyncNode(makeNode(tweakTainted("other"))) + testHTTPHandler(server, livezURL, http.StatusOK, t) + + // Should return 200 "OK" irrespective of node syncs + hs.SyncNode(makeNode(tweakDeleted())) + testHTTPHandler(server, livezURL, http.StatusOK, t) +} + +type url string + +var ( + healthzURL url = "/healthz" + livezURL url = "/livez" +) + +func testHTTPHandler(server httpServer, u url, status int, t *testing.T) { handler := server.(*fakeHTTPServer).handler - req, err := http.NewRequest("GET", "/healthz", nil) + req, err := http.NewRequest("GET", string(u), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index 7dc5e4e4b4d..9d4f81f965c 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -29,6 +29,12 @@ import ( "k8s.io/utils/clock" ) +const ( + // ToBeDeletedTaint is a taint used by the CLuster Autoscaler before marking a node for deletion. Defined in + // https://github.com/kubernetes/autoscaler/blob/e80ab518340f88f364fe3ef063f8303755125971/cluster-autoscaler/utils/deletetaint/delete.go#L36 + ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" +) + // ProxierHealthUpdater allows callers to update healthz timestamp only. type ProxierHealthUpdater interface { // QueuedUpdate should be called when the proxier receives a Service or Endpoints @@ -42,6 +48,10 @@ type ProxierHealthUpdater interface { // Run starts the healthz HTTP server and blocks until it exits. Run() error + // Sync the node and determine if its eligible or not. Eligible is + // defined as being: not tainted by ToBeDeletedTaint and not deleted. + SyncNode(node *v1.Node) + proxierHealthChecker } @@ -62,6 +72,7 @@ type proxierHealthServer struct { lastUpdated atomic.Value oldestPendingQueued atomic.Value + nodeEligible atomic.Bool } // NewProxierHealthServer returns a proxier health http server. @@ -70,7 +81,7 @@ func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder e } func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer { - return &proxierHealthServer{ + hs := &proxierHealthServer{ listener: listener, httpFactory: httpServerFactory, clock: c, @@ -79,6 +90,11 @@ func newProxierHealthServer(listener listener, httpServerFactory httpServerFacto recorder: recorder, nodeRef: nodeRef, } + // The node is eligible (and thus the proxy healthy) while it's starting up + // and until we've processed the first node event that indicates the + // contrary. + hs.nodeEligible.Store(true) + return hs } // Updated indicates that kube-proxy has successfully updated its backend, so it should @@ -96,8 +112,8 @@ func (hs *proxierHealthServer) QueuedUpdate() { hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now()) } -// IsHealthy returns the proxier's health state, following the same definition -// the HTTP server defines. +// IsHealthy returns only the proxier's health state, following the same +// definition the HTTP server defines, but ignoring the state of the Node. func (hs *proxierHealthServer) IsHealthy() bool { isHealthy, _, _ := hs.isHealthy() return isHealthy @@ -123,14 +139,28 @@ func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) { // There's an unprocessed update queued, but it's not late yet healthy = true } - return healthy, lastUpdated, currentTime } +func (hs *proxierHealthServer) SyncNode(node *v1.Node) { + if !node.DeletionTimestamp.IsZero() { + hs.nodeEligible.Store(false) + return + } + for _, taint := range node.Spec.Taints { + if taint.Key == ToBeDeletedTaint { + hs.nodeEligible.Store(false) + return + } + } + hs.nodeEligible.Store(true) +} + // Run starts the healthz HTTP server and blocks until it exits. func (hs *proxierHealthServer) Run() error { serveMux := http.NewServeMux() serveMux.Handle("/healthz", healthzHandler{hs: hs}) + serveMux.Handle("/livez", livezHandler{hs: hs}) server := hs.httpFactory.New(hs.addr, serveMux) listener, err := hs.listener.Listen(hs.addr) @@ -156,6 +186,30 @@ type healthzHandler struct { } func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + nodeEligible := h.hs.nodeEligible.Load() + healthy, lastUpdated, currentTime := h.hs.isHealthy() + healthy = healthy && nodeEligible + resp.Header().Set("Content-Type", "application/json") + resp.Header().Set("X-Content-Type-Options", "nosniff") + if !healthy { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + // In older releases, the returned "lastUpdated" time indicated the last + // time the proxier sync loop ran, even if nothing had changed. To + // preserve compatibility, we use the same semantics: the returned + // lastUpdated value is "recent" if the server is healthy. The kube-proxy + // metrics provide more detailed information. + lastUpdated = currentTime + } + fmt.Fprintf(resp, `{"lastUpdated": %q,"currentTime": %q, "nodeEligible": %v}`, lastUpdated, currentTime, nodeEligible) +} + +type livezHandler struct { + hs *proxierHealthServer +} + +func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { healthy, lastUpdated, currentTime := h.hs.isHealthy() resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 3093dac23e2..edba6f0347f 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -652,6 +652,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { "eventNode", node.Name, "currentNode", proxier.hostname) return } + proxier.mu.Lock() proxier.nodeLabels = nil proxier.needFullSync = true diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index cf3236bf1a5..875987716f1 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -902,6 +902,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) return } + proxier.mu.Lock() proxier.nodeLabels = nil proxier.mu.Unlock() diff --git a/pkg/proxy/node.go b/pkg/proxy/node.go index 1845818945a..7cd24f6d4fc 100644 --- a/pkg/proxy/node.go +++ b/pkg/proxy/node.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/config" + "k8s.io/kubernetes/pkg/proxy/healthcheck" ) // NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned @@ -85,3 +86,23 @@ func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) { // OnNodeSynced is a handler for Node syncs. func (n *NodePodCIDRHandler) OnNodeSynced() {} + +// NodeEligibleHandler handles the life cycle of the Node's eligibility, as +// determined by the health server for directing load balancer traffic. +type NodeEligibleHandler struct { + HealthServer healthcheck.ProxierHealthUpdater +} + +var _ config.NodeHandler = &NodeEligibleHandler{} + +// OnNodeAdd is a handler for Node creates. +func (n *NodeEligibleHandler) OnNodeAdd(node *v1.Node) { n.HealthServer.SyncNode(node) } + +// OnNodeUpdate is a handler for Node updates. +func (n *NodeEligibleHandler) OnNodeUpdate(_, node *v1.Node) { n.HealthServer.SyncNode(node) } + +// OnNodeDelete is a handler for Node deletes. +func (n *NodeEligibleHandler) OnNodeDelete(node *v1.Node) { n.HealthServer.SyncNode(node) } + +// OnNodeSynced is a handler for Node syncs. +func (n *NodeEligibleHandler) OnNodeSynced() {}