From ec917850af8c0efc630572fa9b0ae6c05435e92c Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Thu, 2 Mar 2023 16:34:42 +0100 Subject: [PATCH] Add proxy healthz result to ETP=local health check Today, the health check response to the load balancers asking Kube-proxy for the status of ETP:Local services does not include the healthz state of Kube- proxy. This means that Kube-proxy might indicate to load balancers that they should forward traffic to the node in question, simply because the endpoint is running on the node - this overlooks the fact that Kube-proxy might be not-healthy and hasn't successfully written the rules enabling traffic to reach the endpoint. --- pkg/proxy/healthcheck/healthcheck_test.go | 40 +++++++++++++++-- pkg/proxy/healthcheck/proxier_health.go | 54 ++++++++++++++--------- pkg/proxy/healthcheck/service_health.go | 27 ++++++++---- pkg/proxy/iptables/proxier.go | 2 +- pkg/proxy/ipvs/proxier.go | 2 +- pkg/proxy/winkernel/proxier.go | 2 +- 6 files changed, 93 insertions(+), 34 deletions(-) diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index c0bfe48081d..8c7189cc44b 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -121,7 +121,8 @@ type hcPayload struct { Namespace string Name string } - LocalEndpoints int + LocalEndpoints int + ServiceProxyHealthy bool } type healthzPayload struct { @@ -129,12 +130,21 @@ type healthzPayload struct { CurrentTime string } +type fakeProxierHealthChecker struct { + healthy bool +} + +func (fake fakeProxierHealthChecker) IsHealthy() bool { + return fake.healthy +} + func TestServer(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() nodePortAddresses := utilproxy.NewNodePortAddresses([]string{}) + proxyChecker := &fakeProxierHealthChecker{true} - hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses) + hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker) hcs := hcsi.(*server) if len(hcs.services) != 0 { t.Errorf("expected 0 services, got %d", len(hcs.services)) @@ -351,9 +361,29 @@ func TestServer(t *testing.T) { testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t) testHandler(hcs, nsn3, http.StatusOK, 7, t) testHandler(hcs, nsn4, http.StatusOK, 6, t) + + // fake a temporary unhealthy proxy + proxyChecker.healthy = false + testHandlerWithHealth(hcs, nsn2, http.StatusServiceUnavailable, 0, false, t) + testHandlerWithHealth(hcs, nsn3, http.StatusServiceUnavailable, 7, false, t) + testHandlerWithHealth(hcs, nsn4, http.StatusServiceUnavailable, 6, false, t) + + // fake a healthy proxy + proxyChecker.healthy = true + testHandlerWithHealth(hcs, nsn2, http.StatusServiceUnavailable, 0, true, t) + testHandlerWithHealth(hcs, nsn3, http.StatusOK, 7, true, t) + testHandlerWithHealth(hcs, nsn4, http.StatusOK, 6, true, t) } func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) { + tHandler(hcs, nsn, status, endpoints, true, t) +} + +func testHandlerWithHealth(hcs *server, nsn types.NamespacedName, status int, endpoints int, kubeProxyHealthy bool, t *testing.T) { + tHandler(hcs, nsn, status, endpoints, kubeProxyHealthy, t) +} + +func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, kubeProxyHealthy bool, t *testing.T) { instance := hcs.services[nsn] for _, h := range instance.httpServers { handler := h.(*fakeHTTPServer).handler @@ -379,6 +409,9 @@ func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints in if payload.LocalEndpoints != endpoints { t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) } + if payload.ServiceProxyHealthy != kubeProxyHealthy { + t.Errorf("expected %v kubeProxyHealthy, got %v", kubeProxyHealthy, payload.ServiceProxyHealthy) + } } } @@ -434,12 +467,13 @@ func testHealthzHandler(server httpServer, status int, t *testing.T) { func TestServerWithSelectiveListeningAddress(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() + proxyChecker := &fakeProxierHealthChecker{true} // limiting addresses to loop back. We don't want any cleverness here around getting IP for // machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine nodePortAddresses := utilproxy.NewNodePortAddresses([]string{"127.0.0.0/8"}) - hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses) + hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker) hcs := hcsi.(*server) if len(hcs.services) != 0 { t.Errorf("expected 0 services, got %d", len(hcs.services)) diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index 01f0d70d544..7dc5e4e4b4d 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -41,6 +41,8 @@ type ProxierHealthUpdater interface { // Run starts the healthz HTTP server and blocks until it exits. Run() error + + proxierHealthChecker } var _ ProxierHealthUpdater = &proxierHealthServer{} @@ -94,6 +96,37 @@ func (hs *proxierHealthServer) QueuedUpdate() { hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now()) } +// IsHealthy returns the proxier's health state, following the same definition +// the HTTP server defines. +func (hs *proxierHealthServer) IsHealthy() bool { + isHealthy, _, _ := hs.isHealthy() + return isHealthy +} + +func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) { + var oldestPendingQueued, lastUpdated time.Time + if val := hs.oldestPendingQueued.Load(); val != nil { + oldestPendingQueued = val.(time.Time) + } + if val := hs.lastUpdated.Load(); val != nil { + lastUpdated = val.(time.Time) + } + currentTime := hs.clock.Now() + + healthy := false + switch { + case oldestPendingQueued.IsZero(): + // The proxy is healthy while it's starting up + // or the proxy is fully synced. + healthy = true + case currentTime.Sub(oldestPendingQueued) < hs.healthTimeout: + // There's an unprocessed update queued, but it's not late yet + healthy = true + } + + return healthy, lastUpdated, currentTime +} + // Run starts the healthz HTTP server and blocks until it exits. func (hs *proxierHealthServer) Run() error { serveMux := http.NewServeMux() @@ -123,26 +156,7 @@ type healthzHandler struct { } func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - var oldestPendingQueued, lastUpdated time.Time - if val := h.hs.oldestPendingQueued.Load(); val != nil { - oldestPendingQueued = val.(time.Time) - } - if val := h.hs.lastUpdated.Load(); val != nil { - lastUpdated = val.(time.Time) - } - currentTime := h.hs.clock.Now() - - healthy := false - switch { - case oldestPendingQueued.IsZero(): - // The proxy is healthy while it's starting up - // or the proxy is fully synced. - healthy = true - case currentTime.Sub(oldestPendingQueued) < h.hs.healthTimeout: - // There's an unprocessed update queued, but it's not late yet - healthy = true - } - + healthy, lastUpdated, currentTime := h.hs.isHealthy() resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") if !healthy { diff --git a/pkg/proxy/healthcheck/service_health.go b/pkg/proxy/healthcheck/service_health.go index 1b9328a8831..a6f2afe765d 100644 --- a/pkg/proxy/healthcheck/service_health.go +++ b/pkg/proxy/healthcheck/service_health.go @@ -52,7 +52,13 @@ type ServiceHealthServer interface { SyncEndpoints(newEndpoints map[types.NamespacedName]int) error } -func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *utilproxy.NodePortAddresses) ServiceHealthServer { +type proxierHealthChecker interface { + // IsHealthy returns the proxier's health state, following the same + // definition the HTTP server defines. + IsHealthy() bool +} + +func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *utilproxy.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer { nodeAddresses, err := nodePortAddresses.GetNodeAddresses(utilproxy.RealNetwork{}) if err != nil || nodeAddresses.Len() == 0 { @@ -75,14 +81,15 @@ func newServiceHealthServer(hostname string, recorder events.EventRecorder, list recorder: recorder, listener: listener, httpFactory: factory, + healthzServer: healthzServer, services: map[types.NamespacedName]*hcInstance{}, nodeAddresses: nodeAddresses, } } // NewServiceHealthServer allocates a new service healthcheck server manager -func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *utilproxy.NodePortAddresses) ServiceHealthServer { - return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses) +func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *utilproxy.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer { + return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer) } type server struct { @@ -93,6 +100,8 @@ type server struct { listener listener httpFactory httpServerFactory + healthzServer proxierHealthChecker + lock sync.RWMutex services map[types.NamespacedName]*hcInstance } @@ -226,14 +235,15 @@ func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { return } count := svc.endpoints + kubeProxyHealthy := h.hcs.healthzServer.IsHealthy() h.hcs.lock.RUnlock() resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") - if count == 0 { - resp.WriteHeader(http.StatusServiceUnavailable) - } else { + if count != 0 && kubeProxyHealthy { resp.WriteHeader(http.StatusOK) + } else { + resp.WriteHeader(http.StatusServiceUnavailable) } fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(` { @@ -241,9 +251,10 @@ func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { "namespace": %q, "name": %q }, - "localEndpoints": %d + "localEndpoints": %d, + "serviceProxyHealthy": %v } - `, h.name.Namespace, h.name.Name, count)), "\n")) + `, h.name.Namespace, h.name.Name, count, kubeProxyHealthy)), "\n")) } func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 7a4e48369dc..680ffa7909d 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -268,7 +268,7 @@ func NewProxier(ipFamily v1.IPFamily, masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark) - serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) proxier := &Proxier{ svcPortMap: make(proxy.ServicePortMap), diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 305d003a8d3..a755ecd4c62 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -411,7 +411,7 @@ func NewProxier(ipFamily v1.IPFamily, nodePortAddresses := utilproxy.NewNodePortAddresses(nodePortAddressStrings) - serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) // excludeCIDRs has been validated before, here we just parse it to IPNet list parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs) diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 2318539fe9d..6b137336a44 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -701,7 +701,7 @@ func NewProxier( // windows listens to all node addresses nodePortAddresses := utilproxy.NewNodePortAddresses(nil) - serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) hns, supportedFeatures := newHostNetworkService() hnsNetworkName, err := getNetworkName(config.NetworkName)