From 4e10ff91c5bb2d4ad87bb00ab281cc3ee95a258b Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Sun, 23 Jul 2023 15:44:55 +0530 Subject: [PATCH 1/3] pkg/proxy: move proxier health eventing to cmd/kube-proxy Signed-off-by: Daman Arora --- cmd/kube-proxy/app/server.go | 19 +++++++++++++------ pkg/proxy/healthcheck/healthcheck_test.go | 4 ++-- pkg/proxy/healthcheck/proxier_health.go | 19 ++++--------------- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index d019212db28..987395ee4d8 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -574,7 +574,7 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin } if len(config.HealthzBindAddress) > 0 { - s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef) + s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration) } err = s.platformSetup() @@ -822,16 +822,17 @@ func (s *ProxyServer) Run() error { // TODO(thockin): make it possible for healthz and metrics to be on the same port. - var errCh chan error + var healthzErrCh, metricsErrCh chan error if s.Config.BindAddressHardFail { - errCh = make(chan error) + healthzErrCh = make(chan error) + metricsErrCh = make(chan error) } // Start up a healthz server if requested - serveHealthz(s.HealthzServer, errCh) + serveHealthz(s.HealthzServer, healthzErrCh) // Start up a metrics server if requested - serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh) + serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh) noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) if err != nil { @@ -896,7 +897,13 @@ func (s *ProxyServer) Run() error { go s.Proxier.SyncLoop() - return <-errCh + select { + case err = <-healthzErrCh: + s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", err.Error()) + case err = <-metricsErrCh: + s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartMetricServer", "StartKubeProxy", err.Error()) + } + return err } func (s *ProxyServer) birthCry() { diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index d77a561c870..3be8feac77e 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -470,7 +470,7 @@ func TestHealthzServer(t *testing.T) { httpFactory := newFakeHTTPServerFactory() fakeClock := testingclock.NewFakeClock(time.Now()) - hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) + hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) hsTest := &serverTest{ @@ -524,7 +524,7 @@ func TestLivezServer(t *testing.T) { httpFactory := newFakeHTTPServerFactory() fakeClock := testingclock.NewFakeClock(time.Now()) - hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) + hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs}) hsTest := &serverTest{ diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index 2ffdcc2b9d2..6f5e90f5be4 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -23,9 +23,7 @@ import ( "time" v1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" - api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/utils/clock" ) @@ -68,8 +66,6 @@ type proxierHealthServer struct { addr string healthTimeout time.Duration - recorder events.EventRecorder - nodeRef *v1.ObjectReference lastUpdated atomic.Value oldestPendingQueued atomic.Value @@ -77,19 +73,17 @@ type proxierHealthServer struct { } // NewProxierHealthServer returns a proxier health http server. -func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater { - return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef) +func NewProxierHealthServer(addr string, healthTimeout time.Duration) ProxierHealthUpdater { + return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout) } -func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer { +func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *proxierHealthServer { hs := &proxierHealthServer{ listener: listener, httpFactory: httpServerFactory, clock: c, addr: addr, healthTimeout: healthTimeout, - recorder: recorder, - nodeRef: nodeRef, } // The node is eligible (and thus the proxy healthy) while it's starting up // and until we've processed the first node event that indicates the @@ -166,12 +160,7 @@ func (hs *proxierHealthServer) Run() error { listener, err := hs.listener.Listen(hs.addr) if err != nil { - msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err) - // TODO(thockin): move eventing back to caller - if hs.recorder != nil { - hs.recorder.Eventf(hs.nodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", msg) - } - return fmt.Errorf("%v", msg) + return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err) } klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr) From 4ea6ec738cf32992f6d423c90af97b891fe1e62e Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Sun, 15 Oct 2023 19:51:03 +0530 Subject: [PATCH 2/3] pkg/proxy: add an ipFamily field to the winkernel proxier Signed-off-by: Daman Arora --- cmd/kube-proxy/app/server_windows.go | 1 + pkg/proxy/winkernel/proxier.go | 24 ++++++++++-------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 62adf4a41d1..1b9f9011398 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -96,6 +96,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio ) } else { proxier, err = winkernel.NewProxier( + s.PrimaryIPFamily, config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.ClusterCIDR, diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 75011f94566..fe93d446110 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -590,6 +590,8 @@ type endPointsReferenceCountMap map[string]*uint16 // Proxier is an hns based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { + // ipFamily defines the IP family which this proxier is tracking. + ipFamily v1.IPFamily // TODO(imroc): implement node handler for winkernel proxier. proxyconfig.NoopNodeHandler @@ -608,7 +610,6 @@ type Proxier struct { // with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool - isIPv6Mode bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. @@ -666,6 +667,7 @@ var _ proxy.Provider = &Proxier{} // NewProxier returns a new Proxier func NewProxier( + ipFamily v1.IPFamily, syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, @@ -685,12 +687,6 @@ func NewProxier( klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic") } - isIPv6 := netutils.IsIPv6(nodeIP) - ipFamily := v1.IPv4Protocol - if isIPv6 { - ipFamily = v1.IPv6Protocol - } - // windows listens to all node addresses nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) @@ -771,6 +767,7 @@ func NewProxier( } proxier := &Proxier{ + ipFamily: ipFamily, endPointsRefCount: make(endPointsReferenceCountMap), svcPortMap: make(proxy.ServicePortMap), endpointsMap: make(proxy.EndpointsMap), @@ -786,7 +783,6 @@ func NewProxier( hostMac: hostMac, isDSR: isDSR, supportedFeatures: supportedFeatures, - isIPv6Mode: isIPv6, healthzPort: healthzPort, rootHnsEndpointName: config.RootHnsEndpointName, forwardHealthCheckVip: config.ForwardHealthCheckVip, @@ -817,7 +813,7 @@ func NewDualStackProxier( ) (proxy.Provider, error) { // Create an ipv4 instance of the single-stack proxier - ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, + ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod, clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer, config, healthzPort) @@ -825,7 +821,7 @@ func NewDualStackProxier( return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol]) } - ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, + ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod, clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer, config, healthzPort) if err != nil { @@ -1455,7 +1451,7 @@ func (proxier *Proxier) syncProxyRules() { // Cluster IP LoadBalancer creation hnsLoadBalancer, err := hns.getLoadBalancer( clusterIPEndpoints, - loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, + loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, sourceVip, svcInfo.ClusterIP().String(), Enum(svcInfo.Protocol()), @@ -1490,7 +1486,7 @@ func (proxier *Proxier) syncProxyRules() { // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer hnsLoadBalancer, err := hns.getLoadBalancer( nodePortEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, sourceVip, "", Enum(svcInfo.Protocol()), @@ -1525,7 +1521,7 @@ func (proxier *Proxier) syncProxyRules() { // Try loading existing policies, if already available hnsLoadBalancer, err = hns.getLoadBalancer( externalIPEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, sourceVip, externalIP.ip, Enum(svcInfo.Protocol()), @@ -1556,7 +1552,7 @@ func (proxier *Proxier) syncProxyRules() { if len(lbIngressEndpoints) > 0 { hnsLoadBalancer, err := hns.getLoadBalancer( lbIngressEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, sourceVip, lbIngressIP.ip, Enum(svcInfo.Protocol()), From bfda244e54e48919c82842b2bd4332368cfd334a Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Sun, 15 Oct 2023 22:37:13 +0530 Subject: [PATCH 3/3] pkg/proxy: dual stack health checker Signed-off-by: Daman Arora --- cmd/kube-proxy/app/server.go | 4 +- pkg/proxy/healthcheck/healthcheck_test.go | 113 ++++++++++----- pkg/proxy/healthcheck/proxier_health.go | 169 ++++++++++++---------- pkg/proxy/iptables/proxier.go | 16 +- pkg/proxy/ipvs/proxier.go | 12 +- pkg/proxy/node.go | 2 +- pkg/proxy/winkernel/proxier.go | 12 +- 7 files changed, 189 insertions(+), 139 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 987395ee4d8..0f23dc666ea 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -527,7 +527,7 @@ type ProxyServer struct { Broadcaster events.EventBroadcaster Recorder events.EventRecorder NodeRef *v1.ObjectReference - HealthzServer healthcheck.ProxierHealthUpdater + HealthzServer *healthcheck.ProxierHealthServer Hostname string PrimaryIPFamily v1.IPFamily NodeIPs map[v1.IPFamily]net.IP @@ -735,7 +735,7 @@ func createClient(config componentbaseconfig.ClientConnectionConfiguration, mast return client, nil } -func serveHealthz(hz healthcheck.ProxierHealthUpdater, errCh chan error) { +func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) { if hz == nil { return } diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index 3be8feac77e..c1c451ef8c2 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -480,26 +480,7 @@ func TestHealthzServer(t *testing.T) { tracking503: 0, } - // Should return 200 "OK" by default. - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 200 "OK" after first update - hs.Updated() - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should continue to return 200 "OK" as long as no further updates are queued - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 503 "ServiceUnavailable" if exceed max update-processing time - hs.QueuedUpdate() - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) - - // Should return 200 "OK" after processing update - hs.Updated() - fakeClock.Step(5 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) + testProxierHealthUpdater(hs, hsTest, fakeClock, t) // Should return 200 "OK" if we've synced a node, tainted in any other way hs.SyncNode(makeNode(tweakTainted("other"))) @@ -534,26 +515,7 @@ func TestLivezServer(t *testing.T) { tracking503: 0, } - // Should return 200 "OK" by default. - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 200 "OK" after first update - hs.Updated() - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should continue to return 200 "OK" as long as no further updates are queued - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 503 "ServiceUnavailable" if exceed max update-processing time - hs.QueuedUpdate() - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) - - // Should return 200 "OK" after processing update - hs.Updated() - fakeClock.Step(5 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) + testProxierHealthUpdater(hs, hsTest, fakeClock, t) // Should return 200 "OK" irrespective of node syncs hs.SyncNode(makeNode(tweakTainted("other"))) @@ -579,6 +541,77 @@ var ( livezURL url = "/livez" ) +func testProxierHealthUpdater(hs *ProxierHealthServer, hsTest *serverTest, fakeClock *testingclock.FakeClock, t *testing.T) { + // Should return 200 "OK" by default. + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 200 "OK" after first update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should continue to return 200 "OK" as long as no further updates are queued for any proxier. + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if IPv4 proxier exceed max update-processing time. + hs.QueuedUpdate(v1.IPv4Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + fakeClock.Step(5 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if IPv6 proxier exceed max update-processing time. + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + fakeClock.Step(5 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if both IPv4 and IPv6 proxiers exceed max update-processing time. + hs.QueuedUpdate(v1.IPv4Protocol) + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + fakeClock.Step(5 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // If IPv6 proxier is late for an update but IPv4 proxier is not then updating IPv4 proxier should have no effect. + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv4Protocol) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv6Protocol) + testHTTPHandler(hsTest, http.StatusOK, t) + + // If both IPv4 and IPv6 proxiers are late for an update, we shouldn't report 200 "OK" until after both of them update. + hs.QueuedUpdate(v1.IPv4Protocol) + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv4Protocol) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv6Protocol) + testHTTPHandler(hsTest, http.StatusOK, t) +} + func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) { handler := hsTest.server.(*fakeHTTPServer).handler req, err := http.NewRequest("GET", string(hsTest.url), nil) diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index 6f5e90f5be4..7b009fba135 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -19,7 +19,7 @@ package healthcheck import ( "fmt" "net/http" - "sync/atomic" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -34,32 +34,14 @@ const ( ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" ) -// ProxierHealthUpdater allows callers to update healthz timestamp only. -type ProxierHealthUpdater interface { - // QueuedUpdate should be called when the proxier receives a Service or Endpoints - // event containing information that requires updating service rules. - QueuedUpdate() - - // Updated should be called when the proxier has successfully updated the service - // rules to reflect the current state. - Updated() - - // Run starts the healthz HTTP server and blocks until it exits. - Run() error - - // Sync the node and determine if its eligible or not. Eligible is - // defined as being: not tainted by ToBeDeletedTaint and not deleted. - SyncNode(node *v1.Node) - - proxierHealthChecker -} - -var _ ProxierHealthUpdater = &proxierHealthServer{} -var zeroTime = time.Time{} - -// proxierHealthServer returns 200 "OK" by default. It verifies that the delay between -// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout. -type proxierHealthServer struct { +// ProxierHealthServer allows callers to: +// 1. run a http server with /healthz and /livez endpoint handlers. +// 2. update healthz timestamps before and after synchronizing dataplane. +// 3. sync node status, for reporting unhealthy /healthz response +// if the node is marked for deletion by autoscaler. +// 4. get proxy health by verifying that the delay between QueuedUpdate() +// calls and Updated() calls exceeded healthTimeout or not. +type ProxierHealthServer struct { listener listener httpFactory httpServerFactory clock clock.Clock @@ -67,92 +49,120 @@ type proxierHealthServer struct { addr string healthTimeout time.Duration - lastUpdated atomic.Value - oldestPendingQueued atomic.Value - nodeEligible atomic.Bool + lock sync.RWMutex + lastUpdatedMap map[v1.IPFamily]time.Time + oldestPendingQueuedMap map[v1.IPFamily]time.Time + nodeEligible bool } // NewProxierHealthServer returns a proxier health http server. -func NewProxierHealthServer(addr string, healthTimeout time.Duration) ProxierHealthUpdater { +func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer { return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout) } -func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *proxierHealthServer { - hs := &proxierHealthServer{ +func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer { + return &ProxierHealthServer{ listener: listener, httpFactory: httpServerFactory, clock: c, addr: addr, healthTimeout: healthTimeout, + + lastUpdatedMap: make(map[v1.IPFamily]time.Time), + oldestPendingQueuedMap: make(map[v1.IPFamily]time.Time), + // The node is eligible (and thus the proxy healthy) while it's starting up + // and until we've processed the first node event that indicates the + // contrary. + nodeEligible: true, } - // The node is eligible (and thus the proxy healthy) while it's starting up - // and until we've processed the first node event that indicates the - // contrary. - hs.nodeEligible.Store(true) - return hs } -// Updated indicates that kube-proxy has successfully updated its backend, so it should -// be considered healthy now. -func (hs *proxierHealthServer) Updated() { - hs.oldestPendingQueued.Store(zeroTime) - hs.lastUpdated.Store(hs.clock.Now()) +// Updated should be called when the proxier of the given IP family has successfully updated +// the service rules to reflect the current state and should be considered healthy now. +func (hs *ProxierHealthServer) Updated(ipFamily v1.IPFamily) { + hs.lock.Lock() + defer hs.lock.Unlock() + delete(hs.oldestPendingQueuedMap, ipFamily) + hs.lastUpdatedMap[ipFamily] = hs.clock.Now() } -// QueuedUpdate indicates that the proxy has received changes from the apiserver but -// has not yet pushed them to its backend. If the proxy does not call Updated within the +// QueuedUpdate should be called when the proxier receives a Service or Endpoints event +// from API Server containing information that requires updating service rules. It +// indicates that the proxier for the given IP family has received changes but has not +// yet pushed them to its backend. If the proxier does not call Updated within the // healthTimeout time then it will be considered unhealthy. -func (hs *proxierHealthServer) QueuedUpdate() { - // Set oldestPendingQueued only if it's currently zero - hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now()) +func (hs *ProxierHealthServer) QueuedUpdate(ipFamily v1.IPFamily) { + hs.lock.Lock() + defer hs.lock.Unlock() + // Set oldestPendingQueuedMap[ipFamily] only if it's currently unset + if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set { + hs.oldestPendingQueuedMap[ipFamily] = hs.clock.Now() + } } // IsHealthy returns only the proxier's health state, following the same // definition the HTTP server defines, but ignoring the state of the Node. -func (hs *proxierHealthServer) IsHealthy() bool { - isHealthy, _, _ := hs.isHealthy() +func (hs *ProxierHealthServer) IsHealthy() bool { + isHealthy, _ := hs.isHealthy() return isHealthy } -func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) { - var oldestPendingQueued, lastUpdated time.Time - if val := hs.oldestPendingQueued.Load(); val != nil { - oldestPendingQueued = val.(time.Time) - } - if val := hs.lastUpdated.Load(); val != nil { - lastUpdated = val.(time.Time) - } +func (hs *ProxierHealthServer) isHealthy() (bool, time.Time) { + hs.lock.RLock() + defer hs.lock.RUnlock() + + var lastUpdated time.Time currentTime := hs.clock.Now() - healthy := false - switch { - case oldestPendingQueued.IsZero(): - // The proxy is healthy while it's starting up - // or the proxy is fully synced. - healthy = true - case currentTime.Sub(oldestPendingQueued) < hs.healthTimeout: - // There's an unprocessed update queued, but it's not late yet - healthy = true + for ipFamily, proxierLastUpdated := range hs.lastUpdatedMap { + + if proxierLastUpdated.After(lastUpdated) { + lastUpdated = proxierLastUpdated + } + + if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set { + // the proxier is healthy while it's starting up + // or the proxier is fully synced. + continue + } + + if currentTime.Sub(hs.oldestPendingQueuedMap[ipFamily]) < hs.healthTimeout { + // there's an unprocessed update queued for this proxier, but it's not late yet. + continue + } + return false, proxierLastUpdated } - return healthy, lastUpdated, currentTime + return true, lastUpdated } -func (hs *proxierHealthServer) SyncNode(node *v1.Node) { +// SyncNode syncs the node and determines if it is eligible or not. Eligible is +// defined as being: not tainted by ToBeDeletedTaint and not deleted. +func (hs *ProxierHealthServer) SyncNode(node *v1.Node) { + hs.lock.Lock() + defer hs.lock.Unlock() + if !node.DeletionTimestamp.IsZero() { - hs.nodeEligible.Store(false) + hs.nodeEligible = false return } for _, taint := range node.Spec.Taints { if taint.Key == ToBeDeletedTaint { - hs.nodeEligible.Store(false) + hs.nodeEligible = false return } } - hs.nodeEligible.Store(true) + hs.nodeEligible = true +} + +// NodeEligible returns nodeEligible field of ProxierHealthServer. +func (hs *ProxierHealthServer) NodeEligible() bool { + hs.lock.RLock() + defer hs.lock.RUnlock() + return hs.nodeEligible } // Run starts the healthz HTTP server and blocks until it exits. -func (hs *proxierHealthServer) Run() error { +func (hs *ProxierHealthServer) Run() error { serveMux := http.NewServeMux() serveMux.Handle("/healthz", healthzHandler{hs: hs}) serveMux.Handle("/livez", livezHandler{hs: hs}) @@ -172,12 +182,14 @@ func (hs *proxierHealthServer) Run() error { } type healthzHandler struct { - hs *proxierHealthServer + hs *ProxierHealthServer } func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - nodeEligible := h.hs.nodeEligible.Load() - healthy, lastUpdated, currentTime := h.hs.isHealthy() + nodeEligible := h.hs.NodeEligible() + healthy, lastUpdated := h.hs.isHealthy() + currentTime := h.hs.clock.Now() + healthy = healthy && nodeEligible resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") @@ -198,11 +210,12 @@ func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } type livezHandler struct { - hs *proxierHealthServer + hs *ProxierHealthServer } func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - healthy, lastUpdated, currentTime := h.hs.isHealthy() + healthy, lastUpdated := h.hs.isHealthy() + currentTime := h.hs.clock.Now() resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") if !healthy { diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9a31a57f7c2..106101d7437 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -152,6 +152,9 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { + // ipFamily defines the IP family which this proxier is tracking. + ipFamily v1.IPFamily + // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, @@ -185,7 +188,7 @@ type Proxier struct { recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer - healthzServer healthcheck.ProxierHealthUpdater + healthzServer *healthcheck.ProxierHealthServer // Since converting probabilities (floats) to strings is expensive // and we are using only probabilities in the format of 1/n, we are @@ -237,7 +240,7 @@ func NewProxier(ipFamily v1.IPFamily, hostname string, nodeIP net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, nodePortAddressStrings []string, ) (*Proxier, error) { nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings) @@ -269,6 +272,7 @@ func NewProxier(ipFamily v1.IPFamily, serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) proxier := &Proxier{ + ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -330,7 +334,7 @@ func NewDualStackProxier( hostname string, nodeIPs map[v1.IPFamily]net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, nodePortAddresses []string, ) (proxy.Provider, error) { // Create an ipv4 instance of the single-stack proxier @@ -492,7 +496,7 @@ func (proxier *Proxier) probability(n int) string { // Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { - proxier.healthzServer.QueuedUpdate() + proxier.healthzServer.QueuedUpdate(proxier.ipFamily) } metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() proxier.syncRunner.Run() @@ -502,7 +506,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } // synthesize "last change queued" time as the informers are syncing. @@ -1537,7 +1541,7 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 19c354e9ac1..3f6b66306fc 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -261,7 +261,7 @@ type Proxier struct { recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer - healthzServer healthcheck.ProxierHealthUpdater + healthzServer *healthcheck.ProxierHealthServer ipvsScheduler string // The following buffers are used to reuse memory and avoid allocations @@ -325,7 +325,7 @@ func NewProxier(ipFamily v1.IPFamily, hostname string, nodeIP net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, scheduler string, nodePortAddressStrings []string, kernelHandler KernelHandler, @@ -482,7 +482,7 @@ func NewDualStackProxier( hostname string, nodeIPs map[v1.IPFamily]net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, scheduler string, nodePortAddresses []string, kernelHandler KernelHandler, @@ -756,7 +756,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { - proxier.healthzServer.QueuedUpdate() + proxier.healthzServer.QueuedUpdate(proxier.ipFamily) } metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() proxier.syncRunner.Run() @@ -766,7 +766,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } // synthesize "last change queued" time as the informers are syncing. metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() @@ -1482,7 +1482,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() diff --git a/pkg/proxy/node.go b/pkg/proxy/node.go index 7cd24f6d4fc..15f6c6aabba 100644 --- a/pkg/proxy/node.go +++ b/pkg/proxy/node.go @@ -90,7 +90,7 @@ func (n *NodePodCIDRHandler) OnNodeSynced() {} // NodeEligibleHandler handles the life cycle of the Node's eligibility, as // determined by the health server for directing load balancer traffic. type NodeEligibleHandler struct { - HealthServer healthcheck.ProxierHealthUpdater + HealthServer *healthcheck.ProxierHealthServer } var _ config.NodeHandler = &NodeEligibleHandler{} diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index fe93d446110..cd0044d6838 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -619,7 +619,7 @@ type Proxier struct { recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer - healthzServer healthcheck.ProxierHealthUpdater + healthzServer *healthcheck.ProxierHealthServer hns HostNetworkService network hnsNetworkInfo @@ -674,7 +674,7 @@ func NewProxier( hostname string, nodeIP net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, config config.KubeProxyWinkernelConfiguration, healthzPort int, ) (*Proxier, error) { @@ -807,7 +807,7 @@ func NewDualStackProxier( hostname string, nodeIPs map[v1.IPFamily]net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, config config.KubeProxyWinkernelConfiguration, healthzPort int, ) (proxy.Provider, error) { @@ -954,7 +954,7 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) { // Sync is called to synchronize the proxier state to hns as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { - proxier.healthzServer.QueuedUpdate() + proxier.healthzServer.QueuedUpdate(proxier.ipFamily) } metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() proxier.syncRunner.Run() @@ -964,7 +964,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } // synthesize "last change queued" time as the informers are syncing. metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() @@ -1604,7 +1604,7 @@ func (proxier *Proxier) syncProxyRules() { } if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()