mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Fix kube-proxy healthz server for proxier sync loop changes
The proxy healthz server assumed that kube-proxy would regularly call UpdateTimestamp() even when nothing changed, but that's no longer true. Fix it to only report unhealthiness when updates have been received from the apiserver but not promptly pushed out to iptables/ipvs.
This commit is contained in:
parent
0f10102c16
commit
f83474916e
@ -374,13 +374,21 @@ func TestHealthzServer(t *testing.T) {
|
||||
// Should return 200 "OK" by default.
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
|
||||
// Should return 503 "ServiceUnavailable" if exceed max no respond duration.
|
||||
hs.UpdateTimestamp()
|
||||
// Should return 200 "OK" after first update
|
||||
hs.Updated()
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
|
||||
// Should continue to return 200 "OK" as long as no further updates are queued
|
||||
fakeClock.Step(25 * time.Second)
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
|
||||
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
|
||||
hs.QueuedUpdate()
|
||||
fakeClock.Step(25 * time.Second)
|
||||
testHealthzHandler(server, http.StatusServiceUnavailable, t)
|
||||
|
||||
// Should return 200 "OK" if timestamp is valid.
|
||||
hs.UpdateTimestamp()
|
||||
// Should return 200 "OK" after processing update
|
||||
hs.Updated()
|
||||
fakeClock.Step(5 * time.Second)
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
}
|
||||
|
@ -35,12 +35,17 @@ var proxierHealthzRetryInterval = 60 * time.Second
|
||||
|
||||
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
||||
type ProxierHealthUpdater interface {
|
||||
UpdateTimestamp()
|
||||
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
|
||||
// event containing information that requires updating service rules.
|
||||
QueuedUpdate()
|
||||
|
||||
// Updated should be called when the proxier has successfully updated the service
|
||||
// rules to reflect the current state.
|
||||
Updated()
|
||||
}
|
||||
|
||||
// ProxierHealthServer returns 200 "OK" by default. Once timestamp has been
|
||||
// updated, it verifies we don't exceed max no respond duration since
|
||||
// last update.
|
||||
// ProxierHealthServer returns 200 "OK" by default. It verifies that the delay between
|
||||
// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout.
|
||||
type ProxierHealthServer struct {
|
||||
listener listener
|
||||
httpFactory httpServerFactory
|
||||
@ -53,6 +58,7 @@ type ProxierHealthServer struct {
|
||||
nodeRef *v1.ObjectReference
|
||||
|
||||
lastUpdated atomic.Value
|
||||
lastQueued atomic.Value
|
||||
}
|
||||
|
||||
// NewProxierHealthServer returns a proxier health http server.
|
||||
@ -72,11 +78,16 @@ func newProxierHealthServer(listener listener, httpServerFactory httpServerFacto
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateTimestamp updates the lastUpdated timestamp.
|
||||
func (hs *ProxierHealthServer) UpdateTimestamp() {
|
||||
// Updated updates the lastUpdated timestamp.
|
||||
func (hs *ProxierHealthServer) Updated() {
|
||||
hs.lastUpdated.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// QueuedUpdate updates the lastQueued timestamp.
|
||||
func (hs *ProxierHealthServer) QueuedUpdate() {
|
||||
hs.lastQueued.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// Run starts the healthz http server and returns.
|
||||
func (hs *ProxierHealthServer) Run() {
|
||||
serveMux := http.NewServeMux()
|
||||
@ -109,18 +120,44 @@ type healthzHandler struct {
|
||||
}
|
||||
|
||||
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||
lastUpdated := time.Time{}
|
||||
var lastQueued, lastUpdated time.Time
|
||||
if val := h.hs.lastQueued.Load(); val != nil {
|
||||
lastQueued = val.(time.Time)
|
||||
}
|
||||
if val := h.hs.lastUpdated.Load(); val != nil {
|
||||
lastUpdated = val.(time.Time)
|
||||
}
|
||||
currentTime := h.hs.clock.Now()
|
||||
|
||||
healthy := false
|
||||
switch {
|
||||
case lastUpdated.IsZero():
|
||||
// The proxy is healthy while it's starting up
|
||||
// TODO: this makes it useless as a readinessProbe. Consider changing
|
||||
// to only become healthy after the proxy is fully synced.
|
||||
healthy = true
|
||||
case lastUpdated.After(lastQueued):
|
||||
// We've processed all updates
|
||||
healthy = true
|
||||
case currentTime.Sub(lastQueued) < h.hs.healthTimeout:
|
||||
// There's an unprocessed update queued, but it's not late yet
|
||||
healthy = true
|
||||
}
|
||||
|
||||
resp.Header().Set("Content-Type", "application/json")
|
||||
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
|
||||
if !healthy {
|
||||
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||
} else {
|
||||
resp.WriteHeader(http.StatusOK)
|
||||
|
||||
// In older releases, the returned "lastUpdated" time indicated the last
|
||||
// time the proxier sync loop ran, even if nothing had changed. To
|
||||
// preserve compatibility, we use the same semantics: the returned
|
||||
// lastUpdated value is "recent" if the server is healthy. The kube-proxy
|
||||
// metrics provide more detailed information.
|
||||
lastUpdated = currentTime
|
||||
|
||||
}
|
||||
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
|
||||
}
|
||||
|
@ -462,6 +462,9 @@ func (proxier *Proxier) probability(n int) string {
|
||||
|
||||
// Sync is called to synchronize the proxier state to iptables as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.QueuedUpdate()
|
||||
}
|
||||
proxier.syncRunner.Run()
|
||||
}
|
||||
|
||||
@ -469,7 +472,7 @@ func (proxier *Proxier) Sync() {
|
||||
func (proxier *Proxier) SyncLoop() {
|
||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
proxier.syncRunner.Loop(wait.NeverStop)
|
||||
}
|
||||
@ -496,7 +499,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
|
||||
// service object is observed.
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1451,7 +1454,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
proxier.portsMap = replacementPortsMap
|
||||
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
|
@ -777,6 +777,9 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
|
||||
|
||||
// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.QueuedUpdate()
|
||||
}
|
||||
proxier.syncRunner.Run()
|
||||
}
|
||||
|
||||
@ -784,7 +787,7 @@ func (proxier *Proxier) Sync() {
|
||||
func (proxier *Proxier) SyncLoop() {
|
||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
proxier.syncRunner.Loop(wait.NeverStop)
|
||||
}
|
||||
@ -809,7 +812,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
|
||||
// OnServiceUpdate is called whenever modification of an existing service object is observed.
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -841,7 +844,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
|
||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1492,7 +1495,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
|
||||
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
|
@ -726,6 +726,9 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
|
||||
|
||||
// Sync is called to synchronize the proxier state to hns as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.QueuedUpdate()
|
||||
}
|
||||
proxier.syncRunner.Run()
|
||||
}
|
||||
|
||||
@ -733,7 +736,7 @@ func (proxier *Proxier) Sync() {
|
||||
func (proxier *Proxier) SyncLoop() {
|
||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
proxier.syncRunner.Loop(wait.NeverStop)
|
||||
}
|
||||
@ -753,21 +756,21 @@ func (proxier *Proxier) isInitialized() bool {
|
||||
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -828,21 +831,21 @@ func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) {
|
||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1278,7 +1281,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user