pkg/proxy: dual stack health checker

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora 2023-10-15 22:37:13 +05:30
parent 4ea6ec738c
commit bfda244e54
7 changed files with 189 additions and 139 deletions

View File

@ -527,7 +527,7 @@ type ProxyServer struct {
Broadcaster events.EventBroadcaster Broadcaster events.EventBroadcaster
Recorder events.EventRecorder Recorder events.EventRecorder
NodeRef *v1.ObjectReference NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater HealthzServer *healthcheck.ProxierHealthServer
Hostname string Hostname string
PrimaryIPFamily v1.IPFamily PrimaryIPFamily v1.IPFamily
NodeIPs map[v1.IPFamily]net.IP NodeIPs map[v1.IPFamily]net.IP
@ -735,7 +735,7 @@ func createClient(config componentbaseconfig.ClientConnectionConfiguration, mast
return client, nil return client, nil
} }
func serveHealthz(hz healthcheck.ProxierHealthUpdater, errCh chan error) { func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) {
if hz == nil { if hz == nil {
return return
} }

View File

@ -480,26 +480,7 @@ func TestHealthzServer(t *testing.T) {
tracking503: 0, tracking503: 0,
} }
// Should return 200 "OK" by default. testProxierHealthUpdater(hs, hsTest, fakeClock, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way // Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other"))) hs.SyncNode(makeNode(tweakTainted("other")))
@ -534,26 +515,7 @@ func TestLivezServer(t *testing.T) {
tracking503: 0, tracking503: 0,
} }
// Should return 200 "OK" by default. testProxierHealthUpdater(hs, hsTest, fakeClock, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs // Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other"))) hs.SyncNode(makeNode(tweakTainted("other")))
@ -579,6 +541,77 @@ var (
livezURL url = "/livez" livezURL url = "/livez"
) )
func testProxierHealthUpdater(hs *ProxierHealthServer, hsTest *serverTest, fakeClock *testingclock.FakeClock, t *testing.T) {
// Should return 200 "OK" by default.
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued for any proxier.
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if IPv4 proxier exceed max update-processing time.
hs.QueuedUpdate(v1.IPv4Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if IPv6 proxier exceed max update-processing time.
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if both IPv4 and IPv6 proxiers exceed max update-processing time.
hs.QueuedUpdate(v1.IPv4Protocol)
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// If IPv6 proxier is late for an update but IPv4 proxier is not then updating IPv4 proxier should have no effect.
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv4Protocol)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv6Protocol)
testHTTPHandler(hsTest, http.StatusOK, t)
// If both IPv4 and IPv6 proxiers are late for an update, we shouldn't report 200 "OK" until after both of them update.
hs.QueuedUpdate(v1.IPv4Protocol)
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv4Protocol)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv6Protocol)
testHTTPHandler(hsTest, http.StatusOK, t)
}
func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) { func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) {
handler := hsTest.server.(*fakeHTTPServer).handler handler := hsTest.server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", string(hsTest.url), nil) req, err := http.NewRequest("GET", string(hsTest.url), nil)

View File

@ -19,7 +19,7 @@ package healthcheck
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sync/atomic" "sync"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -34,32 +34,14 @@ const (
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
) )
// ProxierHealthUpdater allows callers to update healthz timestamp only. // ProxierHealthServer allows callers to:
type ProxierHealthUpdater interface { // 1. run a http server with /healthz and /livez endpoint handlers.
// QueuedUpdate should be called when the proxier receives a Service or Endpoints // 2. update healthz timestamps before and after synchronizing dataplane.
// event containing information that requires updating service rules. // 3. sync node status, for reporting unhealthy /healthz response
QueuedUpdate() // if the node is marked for deletion by autoscaler.
// 4. get proxy health by verifying that the delay between QueuedUpdate()
// Updated should be called when the proxier has successfully updated the service // calls and Updated() calls exceeded healthTimeout or not.
// rules to reflect the current state. type ProxierHealthServer struct {
Updated()
// Run starts the healthz HTTP server and blocks until it exits.
Run() error
// Sync the node and determine if its eligible or not. Eligible is
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
SyncNode(node *v1.Node)
proxierHealthChecker
}
var _ ProxierHealthUpdater = &proxierHealthServer{}
var zeroTime = time.Time{}
// proxierHealthServer returns 200 "OK" by default. It verifies that the delay between
// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout.
type proxierHealthServer struct {
listener listener listener listener
httpFactory httpServerFactory httpFactory httpServerFactory
clock clock.Clock clock clock.Clock
@ -67,92 +49,120 @@ type proxierHealthServer struct {
addr string addr string
healthTimeout time.Duration healthTimeout time.Duration
lastUpdated atomic.Value lock sync.RWMutex
oldestPendingQueued atomic.Value lastUpdatedMap map[v1.IPFamily]time.Time
nodeEligible atomic.Bool oldestPendingQueuedMap map[v1.IPFamily]time.Time
nodeEligible bool
} }
// NewProxierHealthServer returns a proxier health http server. // NewProxierHealthServer returns a proxier health http server.
func NewProxierHealthServer(addr string, healthTimeout time.Duration) ProxierHealthUpdater { func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout) return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
} }
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *proxierHealthServer { func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer {
hs := &proxierHealthServer{ return &ProxierHealthServer{
listener: listener, listener: listener,
httpFactory: httpServerFactory, httpFactory: httpServerFactory,
clock: c, clock: c,
addr: addr, addr: addr,
healthTimeout: healthTimeout, healthTimeout: healthTimeout,
lastUpdatedMap: make(map[v1.IPFamily]time.Time),
oldestPendingQueuedMap: make(map[v1.IPFamily]time.Time),
// The node is eligible (and thus the proxy healthy) while it's starting up
// and until we've processed the first node event that indicates the
// contrary.
nodeEligible: true,
} }
// The node is eligible (and thus the proxy healthy) while it's starting up
// and until we've processed the first node event that indicates the
// contrary.
hs.nodeEligible.Store(true)
return hs
} }
// Updated indicates that kube-proxy has successfully updated its backend, so it should // Updated should be called when the proxier of the given IP family has successfully updated
// be considered healthy now. // the service rules to reflect the current state and should be considered healthy now.
func (hs *proxierHealthServer) Updated() { func (hs *ProxierHealthServer) Updated(ipFamily v1.IPFamily) {
hs.oldestPendingQueued.Store(zeroTime) hs.lock.Lock()
hs.lastUpdated.Store(hs.clock.Now()) defer hs.lock.Unlock()
delete(hs.oldestPendingQueuedMap, ipFamily)
hs.lastUpdatedMap[ipFamily] = hs.clock.Now()
} }
// QueuedUpdate indicates that the proxy has received changes from the apiserver but // QueuedUpdate should be called when the proxier receives a Service or Endpoints event
// has not yet pushed them to its backend. If the proxy does not call Updated within the // from API Server containing information that requires updating service rules. It
// indicates that the proxier for the given IP family has received changes but has not
// yet pushed them to its backend. If the proxier does not call Updated within the
// healthTimeout time then it will be considered unhealthy. // healthTimeout time then it will be considered unhealthy.
func (hs *proxierHealthServer) QueuedUpdate() { func (hs *ProxierHealthServer) QueuedUpdate(ipFamily v1.IPFamily) {
// Set oldestPendingQueued only if it's currently zero hs.lock.Lock()
hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now()) defer hs.lock.Unlock()
// Set oldestPendingQueuedMap[ipFamily] only if it's currently unset
if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set {
hs.oldestPendingQueuedMap[ipFamily] = hs.clock.Now()
}
} }
// IsHealthy returns only the proxier's health state, following the same // IsHealthy returns only the proxier's health state, following the same
// definition the HTTP server defines, but ignoring the state of the Node. // definition the HTTP server defines, but ignoring the state of the Node.
func (hs *proxierHealthServer) IsHealthy() bool { func (hs *ProxierHealthServer) IsHealthy() bool {
isHealthy, _, _ := hs.isHealthy() isHealthy, _ := hs.isHealthy()
return isHealthy return isHealthy
} }
func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) { func (hs *ProxierHealthServer) isHealthy() (bool, time.Time) {
var oldestPendingQueued, lastUpdated time.Time hs.lock.RLock()
if val := hs.oldestPendingQueued.Load(); val != nil { defer hs.lock.RUnlock()
oldestPendingQueued = val.(time.Time)
} var lastUpdated time.Time
if val := hs.lastUpdated.Load(); val != nil {
lastUpdated = val.(time.Time)
}
currentTime := hs.clock.Now() currentTime := hs.clock.Now()
healthy := false for ipFamily, proxierLastUpdated := range hs.lastUpdatedMap {
switch {
case oldestPendingQueued.IsZero(): if proxierLastUpdated.After(lastUpdated) {
// The proxy is healthy while it's starting up lastUpdated = proxierLastUpdated
// or the proxy is fully synced. }
healthy = true
case currentTime.Sub(oldestPendingQueued) < hs.healthTimeout: if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set {
// There's an unprocessed update queued, but it's not late yet // the proxier is healthy while it's starting up
healthy = true // or the proxier is fully synced.
continue
}
if currentTime.Sub(hs.oldestPendingQueuedMap[ipFamily]) < hs.healthTimeout {
// there's an unprocessed update queued for this proxier, but it's not late yet.
continue
}
return false, proxierLastUpdated
} }
return healthy, lastUpdated, currentTime return true, lastUpdated
} }
func (hs *proxierHealthServer) SyncNode(node *v1.Node) { // SyncNode syncs the node and determines if it is eligible or not. Eligible is
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
func (hs *ProxierHealthServer) SyncNode(node *v1.Node) {
hs.lock.Lock()
defer hs.lock.Unlock()
if !node.DeletionTimestamp.IsZero() { if !node.DeletionTimestamp.IsZero() {
hs.nodeEligible.Store(false) hs.nodeEligible = false
return return
} }
for _, taint := range node.Spec.Taints { for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint { if taint.Key == ToBeDeletedTaint {
hs.nodeEligible.Store(false) hs.nodeEligible = false
return return
} }
} }
hs.nodeEligible.Store(true) hs.nodeEligible = true
}
// NodeEligible returns nodeEligible field of ProxierHealthServer.
func (hs *ProxierHealthServer) NodeEligible() bool {
hs.lock.RLock()
defer hs.lock.RUnlock()
return hs.nodeEligible
} }
// Run starts the healthz HTTP server and blocks until it exits. // Run starts the healthz HTTP server and blocks until it exits.
func (hs *proxierHealthServer) Run() error { func (hs *ProxierHealthServer) Run() error {
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs}) serveMux.Handle("/healthz", healthzHandler{hs: hs})
serveMux.Handle("/livez", livezHandler{hs: hs}) serveMux.Handle("/livez", livezHandler{hs: hs})
@ -172,12 +182,14 @@ func (hs *proxierHealthServer) Run() error {
} }
type healthzHandler struct { type healthzHandler struct {
hs *proxierHealthServer hs *ProxierHealthServer
} }
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
nodeEligible := h.hs.nodeEligible.Load() nodeEligible := h.hs.NodeEligible()
healthy, lastUpdated, currentTime := h.hs.isHealthy() healthy, lastUpdated := h.hs.isHealthy()
currentTime := h.hs.clock.Now()
healthy = healthy && nodeEligible healthy = healthy && nodeEligible
resp.Header().Set("Content-Type", "application/json") resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff") resp.Header().Set("X-Content-Type-Options", "nosniff")
@ -198,11 +210,12 @@ func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
} }
type livezHandler struct { type livezHandler struct {
hs *proxierHealthServer hs *ProxierHealthServer
} }
func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
healthy, lastUpdated, currentTime := h.hs.isHealthy() healthy, lastUpdated := h.hs.isHealthy()
currentTime := h.hs.clock.Now()
resp.Header().Set("Content-Type", "application/json") resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff") resp.Header().Set("X-Content-Type-Options", "nosniff")
if !healthy { if !healthy {

View File

@ -152,6 +152,9 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
// Proxier is an iptables based proxy for connections between a localhost:lport // Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
// ipFamily defines the IP family which this proxier is tracking.
ipFamily v1.IPFamily
// endpointsChanges and serviceChanges contains all changes to endpoints and // endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since iptables was synced. For a single object, // services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them, // changes are accumulated, i.e. previous is state from before all of them,
@ -185,7 +188,7 @@ type Proxier struct {
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater healthzServer *healthcheck.ProxierHealthServer
// Since converting probabilities (floats) to strings is expensive // Since converting probabilities (floats) to strings is expensive
// and we are using only probabilities in the format of 1/n, we are // and we are using only probabilities in the format of 1/n, we are
@ -237,7 +240,7 @@ func NewProxier(ipFamily v1.IPFamily,
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
nodePortAddressStrings []string, nodePortAddressStrings []string,
) (*Proxier, error) { ) (*Proxier, error) {
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings) nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
@ -269,6 +272,7 @@ func NewProxier(ipFamily v1.IPFamily,
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
proxier := &Proxier{ proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -330,7 +334,7 @@ func NewDualStackProxier(
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
nodePortAddresses []string, nodePortAddresses []string,
) (proxy.Provider, error) { ) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
@ -492,7 +496,7 @@ func (proxier *Proxier) probability(n int) string {
// Sync is called to synchronize the proxier state to iptables as soon as possible. // Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate() proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -502,7 +506,7 @@ func (proxier *Proxier) Sync() {
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
// synthesize "last change queued" time as the informers are syncing. // synthesize "last change queued" time as the informers are syncing.
@ -1537,7 +1541,7 @@ func (proxier *Proxier) syncProxyRules() {
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

View File

@ -261,7 +261,7 @@ type Proxier struct {
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater healthzServer *healthcheck.ProxierHealthServer
ipvsScheduler string ipvsScheduler string
// The following buffers are used to reuse memory and avoid allocations // The following buffers are used to reuse memory and avoid allocations
@ -325,7 +325,7 @@ func NewProxier(ipFamily v1.IPFamily,
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
scheduler string, scheduler string,
nodePortAddressStrings []string, nodePortAddressStrings []string,
kernelHandler KernelHandler, kernelHandler KernelHandler,
@ -482,7 +482,7 @@ func NewDualStackProxier(
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
scheduler string, scheduler string,
nodePortAddresses []string, nodePortAddresses []string,
kernelHandler KernelHandler, kernelHandler KernelHandler,
@ -756,7 +756,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate() proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -766,7 +766,7 @@ func (proxier *Proxier) Sync() {
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
// synthesize "last change queued" time as the informers are syncing. // synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
@ -1482,7 +1482,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

View File

@ -90,7 +90,7 @@ func (n *NodePodCIDRHandler) OnNodeSynced() {}
// NodeEligibleHandler handles the life cycle of the Node's eligibility, as // NodeEligibleHandler handles the life cycle of the Node's eligibility, as
// determined by the health server for directing load balancer traffic. // determined by the health server for directing load balancer traffic.
type NodeEligibleHandler struct { type NodeEligibleHandler struct {
HealthServer healthcheck.ProxierHealthUpdater HealthServer *healthcheck.ProxierHealthServer
} }
var _ config.NodeHandler = &NodeEligibleHandler{} var _ config.NodeHandler = &NodeEligibleHandler{}

View File

@ -619,7 +619,7 @@ type Proxier struct {
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater healthzServer *healthcheck.ProxierHealthServer
hns HostNetworkService hns HostNetworkService
network hnsNetworkInfo network hnsNetworkInfo
@ -674,7 +674,7 @@ func NewProxier(
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
config config.KubeProxyWinkernelConfiguration, config config.KubeProxyWinkernelConfiguration,
healthzPort int, healthzPort int,
) (*Proxier, error) { ) (*Proxier, error) {
@ -807,7 +807,7 @@ func NewDualStackProxier(
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
config config.KubeProxyWinkernelConfiguration, config config.KubeProxyWinkernelConfiguration,
healthzPort int, healthzPort int,
) (proxy.Provider, error) { ) (proxy.Provider, error) {
@ -954,7 +954,7 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
// Sync is called to synchronize the proxier state to hns as soon as possible. // Sync is called to synchronize the proxier state to hns as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate() proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -964,7 +964,7 @@ func (proxier *Proxier) Sync() {
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
// synthesize "last change queued" time as the informers are syncing. // synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
@ -1604,7 +1604,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()