Implement KEP-3836

TL;DR: we want to start failing the LB HC if a node is tainted with ToBeDeletedByClusterAutoscaler.
This field might need refinement, but currently is deemed our best way of understanding if
a node is about to get deleted. We want to do this only for eTP:Cluster services.

The goal is to connection draining terminating nodes
This commit is contained in:
Alexander Constantinescu 2022-07-15 01:21:39 +02:00
parent 0ae9aaacfa
commit 9b1c4c7b57
7 changed files with 200 additions and 11 deletions

View File

@ -27,6 +27,7 @@ import (
"strings"
"time"
"k8s.io/kubernetes/pkg/features"
utilnode "k8s.io/kubernetes/pkg/util/node"
"github.com/fsnotify/fsnotify"
@ -869,6 +870,11 @@ func (s *ProxyServer) Run() error {
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) {
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
HealthServer: s.HealthzServer,
})
}
nodeConfig.RegisterEventHandler(s.Proxier)
go nodeConfig.Run(wait.NeverStop)

View File

@ -449,6 +449,14 @@ const (
// Add support for distributed tracing in the kubelet
KubeletTracing featuregate.Feature = "KubeletTracing"
// owner: @alexanderConstantinescu
// kep: http://kep.k8s.io/3836
// alpha: v1.28
//
// Implement connection draining for terminating nodes for
// `externalTrafficPolicy: Cluster` services.
KubeProxyDrainingTerminatingNodes featuregate.Feature = "KubeProxyDrainingTerminatingNodes"
// owner: @zshihang
// kep: https://kep.k8s.io/2800
// beta: v1.24
@ -976,6 +984,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
KubeletTracing: {Default: true, PreRelease: featuregate.Beta},
KubeProxyDrainingTerminatingNodes: {Default: false, PreRelease: featuregate.Alpha},
LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
LegacyServiceAccountTokenTracking: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.30

View File

@ -27,6 +27,8 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/sets"
@ -131,6 +133,7 @@ type hcPayload struct {
type healthzPayload struct {
LastUpdated string
CurrentTime string
NodeHealthy bool
}
type fakeProxierHealthChecker struct {
@ -427,6 +430,30 @@ func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int,
}
}
type nodeTweak func(n *v1.Node)
func makeNode(tweaks ...nodeTweak) *v1.Node {
n := &v1.Node{}
for _, tw := range tweaks {
tw(n)
}
return n
}
func tweakDeleted() nodeTweak {
return func(n *v1.Node) {
n.DeletionTimestamp = &metav1.Time{
Time: time.Now(),
}
}
}
func tweakTainted(key string) nodeTweak {
return func(n *v1.Node) {
n.Spec.Taints = append(n.Spec.Taints, v1.Taint{Key: key})
}
}
func TestHealthzServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
@ -436,30 +463,99 @@ func TestHealthzServer(t *testing.T) {
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
// Should return 200 "OK" by default.
testHealthzHandler(server, http.StatusOK, t)
testHTTPHandler(server, healthzURL, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHealthzHandler(server, http.StatusOK, t)
testHTTPHandler(server, healthzURL, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHealthzHandler(server, http.StatusOK, t)
testHTTPHandler(server, healthzURL, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHealthzHandler(server, http.StatusServiceUnavailable, t)
testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHealthzHandler(server, http.StatusOK, t)
testHTTPHandler(server, healthzURL, http.StatusOK, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, healthzURL, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if we've synced a ToBeDeletedTaint node
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, healthzURL, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if we've synced a deleted node
hs.SyncNode(makeNode(tweakDeleted()))
testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t)
}
func testHealthzHandler(server httpServer, status int, t *testing.T) {
func TestLivezServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
// Should return 200 "OK" by default.
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(server, livezURL, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, livezURL, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakDeleted()))
testHTTPHandler(server, livezURL, http.StatusOK, t)
}
type url string
var (
healthzURL url = "/healthz"
livezURL url = "/livez"
)
func testHTTPHandler(server httpServer, u url, status int, t *testing.T) {
handler := server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", "/healthz", nil)
req, err := http.NewRequest("GET", string(u), nil)
if err != nil {
t.Fatal(err)
}

View File

@ -29,6 +29,12 @@ import (
"k8s.io/utils/clock"
)
const (
// ToBeDeletedTaint is a taint used by the CLuster Autoscaler before marking a node for deletion. Defined in
// https://github.com/kubernetes/autoscaler/blob/e80ab518340f88f364fe3ef063f8303755125971/cluster-autoscaler/utils/deletetaint/delete.go#L36
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
)
// ProxierHealthUpdater allows callers to update healthz timestamp only.
type ProxierHealthUpdater interface {
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
@ -42,6 +48,10 @@ type ProxierHealthUpdater interface {
// Run starts the healthz HTTP server and blocks until it exits.
Run() error
// Sync the node and determine if its eligible or not. Eligible is
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
SyncNode(node *v1.Node)
proxierHealthChecker
}
@ -62,6 +72,7 @@ type proxierHealthServer struct {
lastUpdated atomic.Value
oldestPendingQueued atomic.Value
nodeEligible atomic.Bool
}
// NewProxierHealthServer returns a proxier health http server.
@ -70,7 +81,7 @@ func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder e
}
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
return &proxierHealthServer{
hs := &proxierHealthServer{
listener: listener,
httpFactory: httpServerFactory,
clock: c,
@ -79,6 +90,11 @@ func newProxierHealthServer(listener listener, httpServerFactory httpServerFacto
recorder: recorder,
nodeRef: nodeRef,
}
// The node is eligible (and thus the proxy healthy) while it's starting up
// and until we've processed the first node event that indicates the
// contrary.
hs.nodeEligible.Store(true)
return hs
}
// Updated indicates that kube-proxy has successfully updated its backend, so it should
@ -96,8 +112,8 @@ func (hs *proxierHealthServer) QueuedUpdate() {
hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now())
}
// IsHealthy returns the proxier's health state, following the same definition
// the HTTP server defines.
// IsHealthy returns only the proxier's health state, following the same
// definition the HTTP server defines, but ignoring the state of the Node.
func (hs *proxierHealthServer) IsHealthy() bool {
isHealthy, _, _ := hs.isHealthy()
return isHealthy
@ -123,14 +139,28 @@ func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) {
// There's an unprocessed update queued, but it's not late yet
healthy = true
}
return healthy, lastUpdated, currentTime
}
func (hs *proxierHealthServer) SyncNode(node *v1.Node) {
if !node.DeletionTimestamp.IsZero() {
hs.nodeEligible.Store(false)
return
}
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
hs.nodeEligible.Store(false)
return
}
}
hs.nodeEligible.Store(true)
}
// Run starts the healthz HTTP server and blocks until it exits.
func (hs *proxierHealthServer) Run() error {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
serveMux.Handle("/livez", livezHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
listener, err := hs.listener.Listen(hs.addr)
@ -156,6 +186,30 @@ type healthzHandler struct {
}
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
nodeEligible := h.hs.nodeEligible.Load()
healthy, lastUpdated, currentTime := h.hs.isHealthy()
healthy = healthy && nodeEligible
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff")
if !healthy {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
// In older releases, the returned "lastUpdated" time indicated the last
// time the proxier sync loop ran, even if nothing had changed. To
// preserve compatibility, we use the same semantics: the returned
// lastUpdated value is "recent" if the server is healthy. The kube-proxy
// metrics provide more detailed information.
lastUpdated = currentTime
}
fmt.Fprintf(resp, `{"lastUpdated": %q,"currentTime": %q, "nodeEligible": %v}`, lastUpdated, currentTime, nodeEligible)
}
type livezHandler struct {
hs *proxierHealthServer
}
func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
healthy, lastUpdated, currentTime := h.hs.isHealthy()
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff")

View File

@ -652,6 +652,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.needFullSync = true

View File

@ -902,6 +902,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.mu.Unlock()

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
)
// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned
@ -85,3 +86,23 @@ func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) {
// OnNodeSynced is a handler for Node syncs.
func (n *NodePodCIDRHandler) OnNodeSynced() {}
// NodeEligibleHandler handles the life cycle of the Node's eligibility, as
// determined by the health server for directing load balancer traffic.
type NodeEligibleHandler struct {
HealthServer healthcheck.ProxierHealthUpdater
}
var _ config.NodeHandler = &NodeEligibleHandler{}
// OnNodeAdd is a handler for Node creates.
func (n *NodeEligibleHandler) OnNodeAdd(node *v1.Node) { n.HealthServer.SyncNode(node) }
// OnNodeUpdate is a handler for Node updates.
func (n *NodeEligibleHandler) OnNodeUpdate(_, node *v1.Node) { n.HealthServer.SyncNode(node) }
// OnNodeDelete is a handler for Node deletes.
func (n *NodeEligibleHandler) OnNodeDelete(node *v1.Node) { n.HealthServer.SyncNode(node) }
// OnNodeSynced is a handler for Node syncs.
func (n *NodeEligibleHandler) OnNodeSynced() {}