diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 1ae502d6b75..3b42bf2f20e 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -578,7 +578,7 @@ type ProxyServer struct { Broadcaster events.EventBroadcaster Recorder events.EventRecorder NodeRef *v1.ObjectReference - HealthzServer healthcheck.ProxierHealthUpdater + HealthzServer *healthcheck.ProxierHealthServer Hostname string PrimaryIPFamily v1.IPFamily NodeIPs map[v1.IPFamily]net.IP @@ -626,7 +626,7 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin } if len(config.HealthzBindAddress) > 0 { - s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef) + s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration) } err = s.platformSetup() @@ -787,7 +787,7 @@ func createClient(config componentbaseconfig.ClientConnectionConfiguration, mast return client, nil } -func serveHealthz(hz healthcheck.ProxierHealthUpdater, errCh chan error) { +func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) { if hz == nil { return } @@ -873,16 +873,17 @@ func (s *ProxyServer) Run() error { // TODO(thockin): make it possible for healthz and metrics to be on the same port. - var errCh chan error + var healthzErrCh, metricsErrCh chan error if s.Config.BindAddressHardFail { - errCh = make(chan error) + healthzErrCh = make(chan error) + metricsErrCh = make(chan error) } // Start up a healthz server if requested - serveHealthz(s.HealthzServer, errCh) + serveHealthz(s.HealthzServer, healthzErrCh) // Start up a metrics server if requested - serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh) + serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh) noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) if err != nil { @@ -947,7 +948,13 @@ func (s *ProxyServer) Run() error { go s.Proxier.SyncLoop() - return <-errCh + select { + case err = <-healthzErrCh: + s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", err.Error()) + case err = <-metricsErrCh: + s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartMetricServer", "StartKubeProxy", err.Error()) + } + return err } func (s *ProxyServer) birthCry() { diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 4c732cbc17c..33a1a15b2b3 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -103,6 +103,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio ) } else { proxier, err = winkernel.NewProxier( + s.PrimaryIPFamily, config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.ClusterCIDR, diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index d77a561c870..c1c451ef8c2 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -470,7 +470,7 @@ func TestHealthzServer(t *testing.T) { httpFactory := newFakeHTTPServerFactory() fakeClock := testingclock.NewFakeClock(time.Now()) - hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) + hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) hsTest := &serverTest{ @@ -480,26 +480,7 @@ func TestHealthzServer(t *testing.T) { tracking503: 0, } - // Should return 200 "OK" by default. - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 200 "OK" after first update - hs.Updated() - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should continue to return 200 "OK" as long as no further updates are queued - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 503 "ServiceUnavailable" if exceed max update-processing time - hs.QueuedUpdate() - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) - - // Should return 200 "OK" after processing update - hs.Updated() - fakeClock.Step(5 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) + testProxierHealthUpdater(hs, hsTest, fakeClock, t) // Should return 200 "OK" if we've synced a node, tainted in any other way hs.SyncNode(makeNode(tweakTainted("other"))) @@ -524,7 +505,7 @@ func TestLivezServer(t *testing.T) { httpFactory := newFakeHTTPServerFactory() fakeClock := testingclock.NewFakeClock(time.Now()) - hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) + hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs}) hsTest := &serverTest{ @@ -534,26 +515,7 @@ func TestLivezServer(t *testing.T) { tracking503: 0, } - // Should return 200 "OK" by default. - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 200 "OK" after first update - hs.Updated() - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should continue to return 200 "OK" as long as no further updates are queued - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) - - // Should return 503 "ServiceUnavailable" if exceed max update-processing time - hs.QueuedUpdate() - fakeClock.Step(25 * time.Second) - testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) - - // Should return 200 "OK" after processing update - hs.Updated() - fakeClock.Step(5 * time.Second) - testHTTPHandler(hsTest, http.StatusOK, t) + testProxierHealthUpdater(hs, hsTest, fakeClock, t) // Should return 200 "OK" irrespective of node syncs hs.SyncNode(makeNode(tweakTainted("other"))) @@ -579,6 +541,77 @@ var ( livezURL url = "/livez" ) +func testProxierHealthUpdater(hs *ProxierHealthServer, hsTest *serverTest, fakeClock *testingclock.FakeClock, t *testing.T) { + // Should return 200 "OK" by default. + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 200 "OK" after first update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should continue to return 200 "OK" as long as no further updates are queued for any proxier. + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if IPv4 proxier exceed max update-processing time. + hs.QueuedUpdate(v1.IPv4Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + fakeClock.Step(5 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if IPv6 proxier exceed max update-processing time. + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + fakeClock.Step(5 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if both IPv4 and IPv6 proxiers exceed max update-processing time. + hs.QueuedUpdate(v1.IPv4Protocol) + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers. + hs.Updated(v1.IPv4Protocol) + hs.Updated(v1.IPv6Protocol) + fakeClock.Step(5 * time.Second) + testHTTPHandler(hsTest, http.StatusOK, t) + + // If IPv6 proxier is late for an update but IPv4 proxier is not then updating IPv4 proxier should have no effect. + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv4Protocol) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv6Protocol) + testHTTPHandler(hsTest, http.StatusOK, t) + + // If both IPv4 and IPv6 proxiers are late for an update, we shouldn't report 200 "OK" until after both of them update. + hs.QueuedUpdate(v1.IPv4Protocol) + hs.QueuedUpdate(v1.IPv6Protocol) + fakeClock.Step(25 * time.Second) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv4Protocol) + testHTTPHandler(hsTest, http.StatusServiceUnavailable, t) + + hs.Updated(v1.IPv6Protocol) + testHTTPHandler(hsTest, http.StatusOK, t) +} + func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) { handler := hsTest.server.(*fakeHTTPServer).handler req, err := http.NewRequest("GET", string(hsTest.url), nil) diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index 2ffdcc2b9d2..7b009fba135 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -19,13 +19,11 @@ package healthcheck import ( "fmt" "net/http" - "sync/atomic" + "sync" "time" v1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" - api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/utils/clock" ) @@ -36,129 +34,135 @@ const ( ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" ) -// ProxierHealthUpdater allows callers to update healthz timestamp only. -type ProxierHealthUpdater interface { - // QueuedUpdate should be called when the proxier receives a Service or Endpoints - // event containing information that requires updating service rules. - QueuedUpdate() - - // Updated should be called when the proxier has successfully updated the service - // rules to reflect the current state. - Updated() - - // Run starts the healthz HTTP server and blocks until it exits. - Run() error - - // Sync the node and determine if its eligible or not. Eligible is - // defined as being: not tainted by ToBeDeletedTaint and not deleted. - SyncNode(node *v1.Node) - - proxierHealthChecker -} - -var _ ProxierHealthUpdater = &proxierHealthServer{} -var zeroTime = time.Time{} - -// proxierHealthServer returns 200 "OK" by default. It verifies that the delay between -// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout. -type proxierHealthServer struct { +// ProxierHealthServer allows callers to: +// 1. run a http server with /healthz and /livez endpoint handlers. +// 2. update healthz timestamps before and after synchronizing dataplane. +// 3. sync node status, for reporting unhealthy /healthz response +// if the node is marked for deletion by autoscaler. +// 4. get proxy health by verifying that the delay between QueuedUpdate() +// calls and Updated() calls exceeded healthTimeout or not. +type ProxierHealthServer struct { listener listener httpFactory httpServerFactory clock clock.Clock addr string healthTimeout time.Duration - recorder events.EventRecorder - nodeRef *v1.ObjectReference - lastUpdated atomic.Value - oldestPendingQueued atomic.Value - nodeEligible atomic.Bool + lock sync.RWMutex + lastUpdatedMap map[v1.IPFamily]time.Time + oldestPendingQueuedMap map[v1.IPFamily]time.Time + nodeEligible bool } // NewProxierHealthServer returns a proxier health http server. -func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater { - return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef) +func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer { + return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout) } -func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer { - hs := &proxierHealthServer{ +func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer { + return &ProxierHealthServer{ listener: listener, httpFactory: httpServerFactory, clock: c, addr: addr, healthTimeout: healthTimeout, - recorder: recorder, - nodeRef: nodeRef, + + lastUpdatedMap: make(map[v1.IPFamily]time.Time), + oldestPendingQueuedMap: make(map[v1.IPFamily]time.Time), + // The node is eligible (and thus the proxy healthy) while it's starting up + // and until we've processed the first node event that indicates the + // contrary. + nodeEligible: true, } - // The node is eligible (and thus the proxy healthy) while it's starting up - // and until we've processed the first node event that indicates the - // contrary. - hs.nodeEligible.Store(true) - return hs } -// Updated indicates that kube-proxy has successfully updated its backend, so it should -// be considered healthy now. -func (hs *proxierHealthServer) Updated() { - hs.oldestPendingQueued.Store(zeroTime) - hs.lastUpdated.Store(hs.clock.Now()) +// Updated should be called when the proxier of the given IP family has successfully updated +// the service rules to reflect the current state and should be considered healthy now. +func (hs *ProxierHealthServer) Updated(ipFamily v1.IPFamily) { + hs.lock.Lock() + defer hs.lock.Unlock() + delete(hs.oldestPendingQueuedMap, ipFamily) + hs.lastUpdatedMap[ipFamily] = hs.clock.Now() } -// QueuedUpdate indicates that the proxy has received changes from the apiserver but -// has not yet pushed them to its backend. If the proxy does not call Updated within the +// QueuedUpdate should be called when the proxier receives a Service or Endpoints event +// from API Server containing information that requires updating service rules. It +// indicates that the proxier for the given IP family has received changes but has not +// yet pushed them to its backend. If the proxier does not call Updated within the // healthTimeout time then it will be considered unhealthy. -func (hs *proxierHealthServer) QueuedUpdate() { - // Set oldestPendingQueued only if it's currently zero - hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now()) +func (hs *ProxierHealthServer) QueuedUpdate(ipFamily v1.IPFamily) { + hs.lock.Lock() + defer hs.lock.Unlock() + // Set oldestPendingQueuedMap[ipFamily] only if it's currently unset + if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set { + hs.oldestPendingQueuedMap[ipFamily] = hs.clock.Now() + } } // IsHealthy returns only the proxier's health state, following the same // definition the HTTP server defines, but ignoring the state of the Node. -func (hs *proxierHealthServer) IsHealthy() bool { - isHealthy, _, _ := hs.isHealthy() +func (hs *ProxierHealthServer) IsHealthy() bool { + isHealthy, _ := hs.isHealthy() return isHealthy } -func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) { - var oldestPendingQueued, lastUpdated time.Time - if val := hs.oldestPendingQueued.Load(); val != nil { - oldestPendingQueued = val.(time.Time) - } - if val := hs.lastUpdated.Load(); val != nil { - lastUpdated = val.(time.Time) - } +func (hs *ProxierHealthServer) isHealthy() (bool, time.Time) { + hs.lock.RLock() + defer hs.lock.RUnlock() + + var lastUpdated time.Time currentTime := hs.clock.Now() - healthy := false - switch { - case oldestPendingQueued.IsZero(): - // The proxy is healthy while it's starting up - // or the proxy is fully synced. - healthy = true - case currentTime.Sub(oldestPendingQueued) < hs.healthTimeout: - // There's an unprocessed update queued, but it's not late yet - healthy = true + for ipFamily, proxierLastUpdated := range hs.lastUpdatedMap { + + if proxierLastUpdated.After(lastUpdated) { + lastUpdated = proxierLastUpdated + } + + if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set { + // the proxier is healthy while it's starting up + // or the proxier is fully synced. + continue + } + + if currentTime.Sub(hs.oldestPendingQueuedMap[ipFamily]) < hs.healthTimeout { + // there's an unprocessed update queued for this proxier, but it's not late yet. + continue + } + return false, proxierLastUpdated } - return healthy, lastUpdated, currentTime + return true, lastUpdated } -func (hs *proxierHealthServer) SyncNode(node *v1.Node) { +// SyncNode syncs the node and determines if it is eligible or not. Eligible is +// defined as being: not tainted by ToBeDeletedTaint and not deleted. +func (hs *ProxierHealthServer) SyncNode(node *v1.Node) { + hs.lock.Lock() + defer hs.lock.Unlock() + if !node.DeletionTimestamp.IsZero() { - hs.nodeEligible.Store(false) + hs.nodeEligible = false return } for _, taint := range node.Spec.Taints { if taint.Key == ToBeDeletedTaint { - hs.nodeEligible.Store(false) + hs.nodeEligible = false return } } - hs.nodeEligible.Store(true) + hs.nodeEligible = true +} + +// NodeEligible returns nodeEligible field of ProxierHealthServer. +func (hs *ProxierHealthServer) NodeEligible() bool { + hs.lock.RLock() + defer hs.lock.RUnlock() + return hs.nodeEligible } // Run starts the healthz HTTP server and blocks until it exits. -func (hs *proxierHealthServer) Run() error { +func (hs *ProxierHealthServer) Run() error { serveMux := http.NewServeMux() serveMux.Handle("/healthz", healthzHandler{hs: hs}) serveMux.Handle("/livez", livezHandler{hs: hs}) @@ -166,12 +170,7 @@ func (hs *proxierHealthServer) Run() error { listener, err := hs.listener.Listen(hs.addr) if err != nil { - msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err) - // TODO(thockin): move eventing back to caller - if hs.recorder != nil { - hs.recorder.Eventf(hs.nodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", msg) - } - return fmt.Errorf("%v", msg) + return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err) } klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr) @@ -183,12 +182,14 @@ func (hs *proxierHealthServer) Run() error { } type healthzHandler struct { - hs *proxierHealthServer + hs *ProxierHealthServer } func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - nodeEligible := h.hs.nodeEligible.Load() - healthy, lastUpdated, currentTime := h.hs.isHealthy() + nodeEligible := h.hs.NodeEligible() + healthy, lastUpdated := h.hs.isHealthy() + currentTime := h.hs.clock.Now() + healthy = healthy && nodeEligible resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") @@ -209,11 +210,12 @@ func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } type livezHandler struct { - hs *proxierHealthServer + hs *ProxierHealthServer } func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - healthy, lastUpdated, currentTime := h.hs.isHealthy() + healthy, lastUpdated := h.hs.isHealthy() + currentTime := h.hs.clock.Now() resp.Header().Set("Content-Type", "application/json") resp.Header().Set("X-Content-Type-Options", "nosniff") if !healthy { diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 95001d38c2a..c121687fd7d 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -139,6 +139,9 @@ func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.Servic // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { + // ipFamily defines the IP family which this proxier is tracking. + ipFamily v1.IPFamily + // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, @@ -172,7 +175,7 @@ type Proxier struct { recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer - healthzServer healthcheck.ProxierHealthUpdater + healthzServer *healthcheck.ProxierHealthServer // Since converting probabilities (floats) to strings is expensive // and we are using only probabilities in the format of 1/n, we are @@ -228,7 +231,7 @@ func NewProxier(ipFamily v1.IPFamily, hostname string, nodeIP net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, nodePortAddressStrings []string, ) (*Proxier, error) { nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings) @@ -262,6 +265,7 @@ func NewProxier(ipFamily v1.IPFamily, serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) proxier := &Proxier{ + ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -324,7 +328,7 @@ func NewDualStackProxier( hostname string, nodeIPs map[v1.IPFamily]net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, nodePortAddresses []string, ) (proxy.Provider, error) { // Create an ipv4 instance of the single-stack proxier @@ -486,7 +490,7 @@ func (proxier *Proxier) probability(n int) string { // Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { - proxier.healthzServer.QueuedUpdate() + proxier.healthzServer.QueuedUpdate(proxier.ipFamily) } metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() proxier.syncRunner.Run() @@ -496,7 +500,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } // synthesize "last change queued" time as the informers are syncing. @@ -1534,7 +1538,7 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 54f95a8e878..e687926d1a0 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -267,7 +267,7 @@ type Proxier struct { recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer - healthzServer healthcheck.ProxierHealthUpdater + healthzServer *healthcheck.ProxierHealthServer ipvsScheduler string // The following buffers are used to reuse memory and avoid allocations @@ -336,7 +336,7 @@ func NewProxier(ipFamily v1.IPFamily, hostname string, nodeIP net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, scheduler string, nodePortAddressStrings []string, kernelHandler KernelHandler, @@ -486,7 +486,7 @@ func NewDualStackProxier( hostname string, nodeIPs map[v1.IPFamily]net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, scheduler string, nodePortAddresses []string, kernelHandler KernelHandler, @@ -760,7 +760,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { - proxier.healthzServer.QueuedUpdate() + proxier.healthzServer.QueuedUpdate(proxier.ipFamily) } metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() proxier.syncRunner.Run() @@ -770,7 +770,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } // synthesize "last change queued" time as the informers are syncing. metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() @@ -1493,7 +1493,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() diff --git a/pkg/proxy/node.go b/pkg/proxy/node.go index 6ae87fdcac6..49df72cd1d7 100644 --- a/pkg/proxy/node.go +++ b/pkg/proxy/node.go @@ -90,7 +90,7 @@ func (n *NodePodCIDRHandler) OnNodeSynced() {} // NodeEligibleHandler handles the life cycle of the Node's eligibility, as // determined by the health server for directing load balancer traffic. type NodeEligibleHandler struct { - HealthServer healthcheck.ProxierHealthUpdater + HealthServer *healthcheck.ProxierHealthServer } var _ config.NodeHandler = &NodeEligibleHandler{} diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 522a15584a2..ef285e99fb8 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -594,6 +594,8 @@ type endPointsReferenceCountMap map[string]*uint16 // Proxier is an hns based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { + // ipFamily defines the IP family which this proxier is tracking. + ipFamily v1.IPFamily // TODO(imroc): implement node handler for winkernel proxier. proxyconfig.NoopNodeHandler @@ -612,7 +614,6 @@ type Proxier struct { // with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool - isIPv6Mode bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. @@ -622,7 +623,7 @@ type Proxier struct { recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer - healthzServer healthcheck.ProxierHealthUpdater + healthzServer *healthcheck.ProxierHealthServer hns HostNetworkService hcn HcnService @@ -671,13 +672,14 @@ var _ proxy.Provider = &Proxier{} // NewProxier returns a new Proxier func NewProxier( + ipFamily v1.IPFamily, syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, config config.KubeProxyWinkernelConfiguration, healthzPort int, ) (*Proxier, error) { @@ -690,12 +692,6 @@ func NewProxier( klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic") } - isIPv6 := netutils.IsIPv6(nodeIP) - ipFamily := v1.IPv4Protocol - if isIPv6 { - ipFamily = v1.IPv6Protocol - } - // windows listens to all node addresses nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) @@ -778,6 +774,7 @@ func NewProxier( } proxier := &Proxier{ + ipFamily: ipFamily, endPointsRefCount: make(endPointsReferenceCountMap), svcPortMap: make(proxy.ServicePortMap), endpointsMap: make(proxy.EndpointsMap), @@ -794,7 +791,6 @@ func NewProxier( hostMac: hostMac, isDSR: isDSR, supportedFeatures: supportedFeatures, - isIPv6Mode: isIPv6, healthzPort: healthzPort, rootHnsEndpointName: config.RootHnsEndpointName, forwardHealthCheckVip: config.ForwardHealthCheckVip, @@ -819,13 +815,13 @@ func NewDualStackProxier( hostname string, nodeIPs map[v1.IPFamily]net.IP, recorder events.EventRecorder, - healthzServer healthcheck.ProxierHealthUpdater, + healthzServer *healthcheck.ProxierHealthServer, config config.KubeProxyWinkernelConfiguration, healthzPort int, ) (proxy.Provider, error) { // Create an ipv4 instance of the single-stack proxier - ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, + ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod, clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer, config, healthzPort) @@ -833,7 +829,7 @@ func NewDualStackProxier( return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol]) } - ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, + ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod, clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer, config, healthzPort) if err != nil { @@ -937,7 +933,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[st // Sync is called to synchronize the proxier state to hns as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { - proxier.healthzServer.QueuedUpdate() + proxier.healthzServer.QueuedUpdate(proxier.ipFamily) } metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() proxier.syncRunner.Run() @@ -947,7 +943,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } // synthesize "last change queued" time as the informers are syncing. metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() @@ -1434,7 +1430,7 @@ func (proxier *Proxier) syncProxyRules() { // Cluster IP LoadBalancer creation hnsLoadBalancer, err := hns.getLoadBalancer( clusterIPEndpoints, - loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, + loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, sourceVip, svcInfo.ClusterIP().String(), Enum(svcInfo.Protocol()), @@ -1469,7 +1465,7 @@ func (proxier *Proxier) syncProxyRules() { // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer hnsLoadBalancer, err := hns.getLoadBalancer( nodePortEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, sourceVip, "", Enum(svcInfo.Protocol()), @@ -1504,7 +1500,7 @@ func (proxier *Proxier) syncProxyRules() { // Try loading existing policies, if already available hnsLoadBalancer, err = hns.getLoadBalancer( externalIPEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, sourceVip, externalIP.ip, Enum(svcInfo.Protocol()), @@ -1535,7 +1531,7 @@ func (proxier *Proxier) syncProxyRules() { if len(lbIngressEndpoints) > 0 { hnsLoadBalancer, err := hns.getLoadBalancer( lbIngressEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, sourceVip, lbIngressIP.ip, Enum(svcInfo.Protocol()), @@ -1587,7 +1583,7 @@ func (proxier *Proxier) syncProxyRules() { } if proxier.healthzServer != nil { - proxier.healthzServer.Updated() + proxier.healthzServer.Updated(proxier.ipFamily) } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()