diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index 6c7a02b6788..f756cb85b74 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -374,13 +374,21 @@ func TestHealthzServer(t *testing.T) { // Should return 200 "OK" by default. testHealthzHandler(server, http.StatusOK, t) - // Should return 503 "ServiceUnavailable" if exceed max no respond duration. - hs.UpdateTimestamp() + // Should return 200 "OK" after first update + hs.Updated() + testHealthzHandler(server, http.StatusOK, t) + + // Should continue to return 200 "OK" as long as no further updates are queued + fakeClock.Step(25 * time.Second) + testHealthzHandler(server, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if exceed max update-processing time + hs.QueuedUpdate() fakeClock.Step(25 * time.Second) testHealthzHandler(server, http.StatusServiceUnavailable, t) - // Should return 200 "OK" if timestamp is valid. - hs.UpdateTimestamp() + // Should return 200 "OK" after processing update + hs.Updated() fakeClock.Step(5 * time.Second) testHealthzHandler(server, http.StatusOK, t) } diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index e4ff2304f53..44dc873fac3 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -35,12 +35,17 @@ var proxierHealthzRetryInterval = 60 * time.Second // ProxierHealthUpdater allows callers to update healthz timestamp only. type ProxierHealthUpdater interface { - UpdateTimestamp() + // QueuedUpdate should be called when the proxier receives a Service or Endpoints + // event containing information that requires updating service rules. + QueuedUpdate() + + // Updated should be called when the proxier has successfully updated the service + // rules to reflect the current state. + Updated() } -// ProxierHealthServer returns 200 "OK" by default. Once timestamp has been -// updated, it verifies we don't exceed max no respond duration since -// last update. +// ProxierHealthServer returns 200 "OK" by default. It verifies that the delay between +// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout. type ProxierHealthServer struct { listener listener httpFactory httpServerFactory @@ -53,6 +58,7 @@ type ProxierHealthServer struct { nodeRef *v1.ObjectReference lastUpdated atomic.Value + lastQueued atomic.Value } // NewProxierHealthServer returns a proxier health http server. @@ -72,11 +78,16 @@ func newProxierHealthServer(listener listener, httpServerFactory httpServerFacto } } -// UpdateTimestamp updates the lastUpdated timestamp. -func (hs *ProxierHealthServer) UpdateTimestamp() { +// Updated updates the lastUpdated timestamp. +func (hs *ProxierHealthServer) Updated() { hs.lastUpdated.Store(hs.clock.Now()) } +// QueuedUpdate updates the lastQueued timestamp. +func (hs *ProxierHealthServer) QueuedUpdate() { + hs.lastQueued.Store(hs.clock.Now()) +} + // Run starts the healthz http server and returns. func (hs *ProxierHealthServer) Run() { serveMux := http.NewServeMux() @@ -109,18 +120,44 @@ type healthzHandler struct { } func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - lastUpdated := time.Time{} + var lastQueued, lastUpdated time.Time + if val := h.hs.lastQueued.Load(); val != nil { + lastQueued = val.(time.Time) + } if val := h.hs.lastUpdated.Load(); val != nil { lastUpdated = val.(time.Time) } currentTime := h.hs.clock.Now() + healthy := false + switch { + case lastUpdated.IsZero(): + // The proxy is healthy while it's starting up + // TODO: this makes it useless as a readinessProbe. Consider changing + // to only become healthy after the proxy is fully synced. + healthy = true + case lastUpdated.After(lastQueued): + // We've processed all updates + healthy = true + case currentTime.Sub(lastQueued) < h.hs.healthTimeout: + // There's an unprocessed update queued, but it's not late yet + healthy = true + } + resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") - if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) { + if !healthy { resp.WriteHeader(http.StatusServiceUnavailable) } else { resp.WriteHeader(http.StatusOK) + + // In older releases, the returned "lastUpdated" time indicated the last + // time the proxier sync loop ran, even if nothing had changed. To + // preserve compatibility, we use the same semantics: the returned + // lastUpdated value is "recent" if the server is healthy. The kube-proxy + // metrics provide more detailed information. + lastUpdated = currentTime + } fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index caea525c7b2..097776f4f08 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -462,6 +462,9 @@ func (proxier *Proxier) probability(n int) string { // Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { + if proxier.healthzServer != nil { + proxier.healthzServer.QueuedUpdate() + } proxier.syncRunner.Run() } @@ -469,7 +472,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } proxier.syncRunner.Loop(wait.NeverStop) } @@ -496,7 +499,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) { // service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -1451,7 +1454,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.portsMap = replacementPortsMap if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index d1ac96629ee..605c79856e3 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -777,6 +777,9 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. func (proxier *Proxier) Sync() { + if proxier.healthzServer != nil { + proxier.healthzServer.QueuedUpdate() + } proxier.syncRunner.Run() } @@ -784,7 +787,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } proxier.syncRunner.Loop(wait.NeverStop) } @@ -809,7 +812,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) { // OnServiceUpdate is called whenever modification of an existing service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -841,7 +844,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -1492,7 +1495,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 89b175e65d7..eb481a81e6b 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -726,6 +726,9 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) { // Sync is called to synchronize the proxier state to hns as soon as possible. func (proxier *Proxier) Sync() { + if proxier.healthzServer != nil { + proxier.healthzServer.QueuedUpdate() + } proxier.syncRunner.Run() } @@ -733,7 +736,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } proxier.syncRunner.Loop(wait.NeverStop) } @@ -753,21 +756,21 @@ func (proxier *Proxier) isInitialized() bool { func (proxier *Proxier) OnServiceAdd(service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnServiceDelete(service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -828,21 +831,21 @@ func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) { func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -1278,7 +1281,7 @@ func (proxier *Proxier) syncProxyRules() { } if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } SyncProxyRulesLastTimestamp.SetToCurrentTime()