diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 54fe0681242..0d94a88f00f 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -482,7 +482,7 @@ type ProxyServer struct { UseEndpointSlices bool OOMScoreAdj *int32 ConfigSyncPeriod time.Duration - HealthzServer *healthcheck.HealthzServer + HealthzServer *healthcheck.ProxierHealthServer } // createClients creates a kube client and an event client from the given config and masterOverride. diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index af17e7ec1af..0a4239ffd17 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -125,11 +125,9 @@ func newProxyServer( Namespace: "", } - var healthzServer *healthcheck.HealthzServer - var healthzUpdater healthcheck.HealthzUpdater + var healthzServer *healthcheck.ProxierHealthServer if len(config.HealthzBindAddress) > 0 { - healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) - healthzUpdater = healthzServer + healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) } var proxier proxy.Provider @@ -162,7 +160,7 @@ func newProxyServer( hostname, nodeIP, recorder, - healthzUpdater, + healthzServer, config.NodePortAddresses, ) if err != nil { diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index bef3f3be017..04b0993973c 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -87,11 +87,9 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi Namespace: "", } - var healthzServer *healthcheck.HealthzServer - var healthzUpdater healthcheck.HealthzUpdater + var healthzServer *healthcheck.ProxierHealthServer if len(config.HealthzBindAddress) > 0 { - healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) - healthzUpdater = healthzServer + healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) } var proxier proxy.Provider @@ -108,7 +106,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi hostname, utilnode.GetNodeIP(client, hostname), recorder, - healthzUpdater, + healthzServer, config.Winkernel, ) if err != nil { diff --git a/pkg/proxy/healthcheck/BUILD b/pkg/proxy/healthcheck/BUILD index d136039b211..d99782630f7 100644 --- a/pkg/proxy/healthcheck/BUILD +++ b/pkg/proxy/healthcheck/BUILD @@ -9,8 +9,10 @@ load( go_library( name = "go_default_library", srcs = [ + "common.go", "doc.go", - "healthcheck.go", + "proxier_health.go", + "service_health.go", ], importpath = "k8s.io/kubernetes/pkg/proxy/healthcheck", deps = [ diff --git a/pkg/proxy/healthcheck/common.go b/pkg/proxy/healthcheck/common.go new file mode 100644 index 00000000000..f65cf1666e8 --- /dev/null +++ b/pkg/proxy/healthcheck/common.go @@ -0,0 +1,63 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "net" + "net/http" +) + +// listener allows for testing of ServiceHealthServer and ProxierHealthServer. +type listener interface { + // Listen is very much like net.Listen, except the first arg (network) is + // fixed to be "tcp". + Listen(addr string) (net.Listener, error) +} + +// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer. +type httpServerFactory interface { + // New creates an instance of a type satisfying HTTPServer. This is + // designed to include http.Server. + New(addr string, handler http.Handler) httpServer +} + +// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer. +// It is designed so that http.Server satisfies this interface, +type httpServer interface { + Serve(listener net.Listener) error +} + +// Implement listener in terms of net.Listen. +type stdNetListener struct{} + +func (stdNetListener) Listen(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) +} + +var _ listener = stdNetListener{} + +// Implement httpServerFactory in terms of http.Server. +type stdHTTPServerFactory struct{} + +func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer { + return &http.Server{ + Addr: addr, + Handler: handler, + } +} + +var _ httpServerFactory = stdHTTPServerFactory{} diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index 0a7da2f2768..f756cb85b74 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -79,7 +79,7 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory { return &fakeHTTPServerFactory{} } -func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { +func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer { return &fakeHTTPServer{ addr: addr, handler: handler, @@ -119,7 +119,7 @@ func TestServer(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() - hcsi := NewServer("hostname", nil, listener, httpFactory) + hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory) hcs := hcsi.(*server) if len(hcs.services) != 0 { t.Errorf("expected 0 services, got %d", len(hcs.services)) @@ -368,24 +368,32 @@ func TestHealthzServer(t *testing.T) { httpFactory := newFakeHTTPServerFactory() fakeClock := clock.NewFakeClock(time.Now()) - hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) + hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) // Should return 200 "OK" by default. testHealthzHandler(server, http.StatusOK, t) - // Should return 503 "ServiceUnavailable" if exceed max no respond duration. - hs.UpdateTimestamp() + // Should return 200 "OK" after first update + hs.Updated() + testHealthzHandler(server, http.StatusOK, t) + + // Should continue to return 200 "OK" as long as no further updates are queued + fakeClock.Step(25 * time.Second) + testHealthzHandler(server, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if exceed max update-processing time + hs.QueuedUpdate() fakeClock.Step(25 * time.Second) testHealthzHandler(server, http.StatusServiceUnavailable, t) - // Should return 200 "OK" if timestamp is valid. - hs.UpdateTimestamp() + // Should return 200 "OK" after processing update + hs.Updated() fakeClock.Step(5 * time.Second) testHealthzHandler(server, http.StatusOK, t) } -func testHealthzHandler(server HTTPServer, status int, t *testing.T) { +func testHealthzHandler(server httpServer, status int, t *testing.T) { handler := server.(*fakeHTTPServer).handler req, err := http.NewRequest("GET", "/healthz", nil) if err != nil { diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go new file mode 100644 index 00000000000..44dc873fac3 --- /dev/null +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -0,0 +1,163 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "fmt" + "net/http" + "sync/atomic" + "time" + + "k8s.io/klog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" + api "k8s.io/kubernetes/pkg/apis/core" +) + +var proxierHealthzRetryInterval = 60 * time.Second + +// ProxierHealthUpdater allows callers to update healthz timestamp only. +type ProxierHealthUpdater interface { + // QueuedUpdate should be called when the proxier receives a Service or Endpoints + // event containing information that requires updating service rules. + QueuedUpdate() + + // Updated should be called when the proxier has successfully updated the service + // rules to reflect the current state. + Updated() +} + +// ProxierHealthServer returns 200 "OK" by default. It verifies that the delay between +// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout. +type ProxierHealthServer struct { + listener listener + httpFactory httpServerFactory + clock clock.Clock + + addr string + port int32 + healthTimeout time.Duration + recorder record.EventRecorder + nodeRef *v1.ObjectReference + + lastUpdated atomic.Value + lastQueued atomic.Value +} + +// NewProxierHealthServer returns a proxier health http server. +func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *ProxierHealthServer { + return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef) +} + +func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *ProxierHealthServer { + return &ProxierHealthServer{ + listener: listener, + httpFactory: httpServerFactory, + clock: c, + addr: addr, + healthTimeout: healthTimeout, + recorder: recorder, + nodeRef: nodeRef, + } +} + +// Updated updates the lastUpdated timestamp. +func (hs *ProxierHealthServer) Updated() { + hs.lastUpdated.Store(hs.clock.Now()) +} + +// QueuedUpdate updates the lastQueued timestamp. +func (hs *ProxierHealthServer) QueuedUpdate() { + hs.lastQueued.Store(hs.clock.Now()) +} + +// Run starts the healthz http server and returns. +func (hs *ProxierHealthServer) Run() { + serveMux := http.NewServeMux() + serveMux.Handle("/healthz", healthzHandler{hs: hs}) + server := hs.httpFactory.New(hs.addr, serveMux) + + go wait.Until(func() { + klog.V(3).Infof("Starting goroutine for proxier healthz on %s", hs.addr) + + listener, err := hs.listener.Listen(hs.addr) + if err != nil { + msg := fmt.Sprintf("Failed to start proxier healthz on %s: %v", hs.addr, err) + if hs.recorder != nil { + hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg) + } + klog.Error(msg) + return + } + + if err := server.Serve(listener); err != nil { + klog.Errorf("Proxier healthz closed with error: %v", err) + return + } + klog.Error("Unexpected proxier healthz closed.") + }, proxierHealthzRetryInterval, wait.NeverStop) +} + +type healthzHandler struct { + hs *ProxierHealthServer +} + +func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + var lastQueued, lastUpdated time.Time + if val := h.hs.lastQueued.Load(); val != nil { + lastQueued = val.(time.Time) + } + if val := h.hs.lastUpdated.Load(); val != nil { + lastUpdated = val.(time.Time) + } + currentTime := h.hs.clock.Now() + + healthy := false + switch { + case lastUpdated.IsZero(): + // The proxy is healthy while it's starting up + // TODO: this makes it useless as a readinessProbe. Consider changing + // to only become healthy after the proxy is fully synced. + healthy = true + case lastUpdated.After(lastQueued): + // We've processed all updates + healthy = true + case currentTime.Sub(lastQueued) < h.hs.healthTimeout: + // There's an unprocessed update queued, but it's not late yet + healthy = true + } + + resp.Header().Set("Content-Type", "application/json") + resp.Header().Set("X-Content-Type-Options", "nosniff") + if !healthy { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + + // In older releases, the returned "lastUpdated" time indicated the last + // time the proxier sync loop ran, even if nothing had changed. To + // preserve compatibility, we use the same semantics: the returned + // lastUpdated value is "recent" if the server is healthy. The kube-proxy + // metrics provide more detailed information. + lastUpdated = currentTime + + } + fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) +} diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/service_health.go similarity index 51% rename from pkg/proxy/healthcheck/healthcheck.go rename to pkg/proxy/healthcheck/service_health.go index 99d9904b5c7..0dcb61adbd0 100644 --- a/pkg/proxy/healthcheck/healthcheck.go +++ b/pkg/proxy/healthcheck/service_health.go @@ -22,27 +22,21 @@ import ( "net/http" "strings" "sync" - "sync/atomic" - "time" "github.com/lithammer/dedent" "k8s.io/klog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" api "k8s.io/kubernetes/pkg/apis/core" ) -var nodeHealthzRetryInterval = 60 * time.Second - -// Server serves HTTP endpoints for each service name, with results +// ServiceHealthServer serves HTTP endpoints for each service name, with results // based on the endpoints. If there are 0 endpoints for a service, it returns a // 503 "Service Unavailable" error (telling LBs not to use this node). If there // are 1 or more endpoints, it returns a 200 "OK". -type Server interface { +type ServiceHealthServer interface { // Make the new set of services be active. Services that were open before // will be closed. Services that are new will be opened. Service that // existed and are in the new set will be left alone. The value of the map @@ -54,73 +48,26 @@ type Server interface { SyncEndpoints(newEndpoints map[types.NamespacedName]int) error } -// Listener allows for testing of Server. If the Listener argument -// to NewServer() is nil, the real net.Listen function will be used. -type Listener interface { - // Listen is very much like net.Listen, except the first arg (network) is - // fixed to be "tcp". - Listen(addr string) (net.Listener, error) -} - -// HTTPServerFactory allows for testing of Server. If the -// HTTPServerFactory argument to NewServer() is nil, the real -// http.Server type will be used. -type HTTPServerFactory interface { - // New creates an instance of a type satisfying HTTPServer. This is - // designed to include http.Server. - New(addr string, handler http.Handler) HTTPServer -} - -// HTTPServer allows for testing of Server. -type HTTPServer interface { - // Server is designed so that http.Server satisfies this interface, - Serve(listener net.Listener) error -} - -// NewServer allocates a new healthcheck server manager. If either -// of the injected arguments are nil, defaults will be used. -func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server { - if listener == nil { - listener = stdNetListener{} - } - if httpServerFactory == nil { - httpServerFactory = stdHTTPServerFactory{} - } +func newServiceHealthServer(hostname string, recorder record.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer { return &server{ hostname: hostname, recorder: recorder, listener: listener, - httpFactory: httpServerFactory, + httpFactory: factory, services: map[types.NamespacedName]*hcInstance{}, } } -// Implement Listener in terms of net.Listen. -type stdNetListener struct{} - -func (stdNetListener) Listen(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) +// NewServiceHealthServer allocates a new service healthcheck server manager +func NewServiceHealthServer(hostname string, recorder record.EventRecorder) ServiceHealthServer { + return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}) } -var _ Listener = stdNetListener{} - -// Implement HTTPServerFactory in terms of http.Server. -type stdHTTPServerFactory struct{} - -func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { - return &http.Server{ - Addr: addr, - Handler: handler, - } -} - -var _ HTTPServerFactory = stdHTTPServerFactory{} - type server struct { hostname string recorder record.EventRecorder // can be nil - listener Listener - httpFactory HTTPServerFactory + listener listener + httpFactory httpServerFactory lock sync.RWMutex services map[types.NamespacedName]*hcInstance @@ -187,7 +134,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err type hcInstance struct { port uint16 listener net.Listener - server HTTPServer + server httpServer endpoints int // number of local endpoints for a service } @@ -247,103 +194,20 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro return nil } -// HealthzUpdater allows callers to update healthz timestamp only. -type HealthzUpdater interface { - UpdateTimestamp() +// FakeServiceHealthServer is a fake ServiceHealthServer for test programs +type FakeServiceHealthServer struct{} + +// NewFakeServiceHealthServer allocates a new fake service healthcheck server manager +func NewFakeServiceHealthServer() ServiceHealthServer { + return FakeServiceHealthServer{} } -// HealthzServer returns 200 "OK" by default. Once timestamp has been -// updated, it verifies we don't exceed max no respond duration since -// last update. -type HealthzServer struct { - listener Listener - httpFactory HTTPServerFactory - clock clock.Clock - - addr string - port int32 - healthTimeout time.Duration - recorder record.EventRecorder - nodeRef *v1.ObjectReference - - lastUpdated atomic.Value +// SyncServices is part of ServiceHealthServer +func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error { + return nil } -// NewDefaultHealthzServer returns a default healthz http server. -func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer { - return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef) -} - -func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer { - if listener == nil { - listener = stdNetListener{} - } - if httpServerFactory == nil { - httpServerFactory = stdHTTPServerFactory{} - } - if c == nil { - c = clock.RealClock{} - } - return &HealthzServer{ - listener: listener, - httpFactory: httpServerFactory, - clock: c, - addr: addr, - healthTimeout: healthTimeout, - recorder: recorder, - nodeRef: nodeRef, - } -} - -// UpdateTimestamp updates the lastUpdated timestamp. -func (hs *HealthzServer) UpdateTimestamp() { - hs.lastUpdated.Store(hs.clock.Now()) -} - -// Run starts the healthz http server and returns. -func (hs *HealthzServer) Run() { - serveMux := http.NewServeMux() - serveMux.Handle("/healthz", healthzHandler{hs: hs}) - server := hs.httpFactory.New(hs.addr, serveMux) - - go wait.Until(func() { - klog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) - - listener, err := hs.listener.Listen(hs.addr) - if err != nil { - msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err) - if hs.recorder != nil { - hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg) - } - klog.Error(msg) - return - } - - if err := server.Serve(listener); err != nil { - klog.Errorf("Healthz closed with error: %v", err) - return - } - klog.Error("Unexpected healthz closed.") - }, nodeHealthzRetryInterval, wait.NeverStop) -} - -type healthzHandler struct { - hs *HealthzServer -} - -func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - lastUpdated := time.Time{} - if val := h.hs.lastUpdated.Load(); val != nil { - lastUpdated = val.(time.Time) - } - currentTime := h.hs.clock.Now() - - resp.Header().Set("Content-Type", "application/json") - resp.Header().Set("X-Content-Type-Options", "nosniff") - if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) { - resp.WriteHeader(http.StatusServiceUnavailable) - } else { - resp.WriteHeader(http.StatusOK) - } - fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) +// SyncEndpoints is part of ServiceHealthServer +func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error { + return nil } diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index b45e983765f..3ba4b3ff738 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -38,6 +38,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/proxy:go_default_library", + "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/proxy/util/testing:go_default_library", "//pkg/util/async:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index d69c11e9238..2dd0389dc55 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -200,8 +200,9 @@ type Proxier struct { nodeIP net.IP portMapper utilproxy.PortOpener recorder record.EventRecorder - healthChecker healthcheck.Server - healthzServer healthcheck.HealthzUpdater + + serviceHealthServer healthcheck.ServiceHealthServer + healthzServer healthcheck.ProxierHealthUpdater // Since converting probabilities (floats) to strings is expensive // and we are using only probabilities in the format of 1/n, we are @@ -257,7 +258,7 @@ func NewProxier(ipt utiliptables.Interface, hostname string, nodeIP net.IP, recorder record.EventRecorder, - healthzServer healthcheck.HealthzUpdater, + healthzServer healthcheck.ProxierHealthUpdater, nodePortAddresses []string, ) (*Proxier, error) { // Set the route_localnet sysctl we need for @@ -291,7 +292,7 @@ func NewProxier(ipt utiliptables.Interface, endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) - healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) isIPv6 := ipt.IsIpv6() proxier := &Proxier{ @@ -309,7 +310,7 @@ func NewProxier(ipt utiliptables.Interface, nodeIP: nodeIP, portMapper: &listenPortOpener{}, recorder: recorder, - healthChecker: healthChecker, + serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), @@ -461,6 +462,9 @@ func (proxier *Proxier) probability(n int) string { // Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { + if proxier.healthzServer != nil { + proxier.healthzServer.QueuedUpdate() + } proxier.syncRunner.Run() } @@ -468,7 +472,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } proxier.syncRunner.Loop(wait.NeverStop) } @@ -495,7 +499,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) { // service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -1449,19 +1453,18 @@ func (proxier *Proxier) syncProxyRules() { } proxier.portsMap = replacementPortsMap - // Update healthz timestamp. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() - // Update healthchecks. The endpoints list might include services that are - // not "OnlyLocal", but the services list will not, and the healthChecker + // Update service healthchecks. The endpoints list might include services that are + // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { klog.Errorf("Error syncing healthcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { klog.Errorf("Error syncing healthcheck endpoints: %v", err) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 5180c158d0d..af5e40d5981 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" @@ -342,28 +343,6 @@ func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Close return nil, nil } -type fakeHealthChecker struct { - services map[types.NamespacedName]uint16 - endpoints map[types.NamespacedName]int -} - -func newFakeHealthChecker() *fakeHealthChecker { - return &fakeHealthChecker{ - services: map[types.NamespacedName]uint16{}, - endpoints: map[types.NamespacedName]int{}, - } -} - -func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error { - fake.services = newServices - return nil -} - -func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { - fake.endpoints = newEndpoints - return nil -} - const testHostname = "test-hostname" func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier { @@ -380,7 +359,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro hostname: testHostname, portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, - healthChecker: newFakeHealthChecker(), + serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil), diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index ff2c1ab0c16..4c734059b66 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -16,6 +16,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/proxy:go_default_library", + "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/proxy/util/testing:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e58782e4b82..605c79856e3 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -227,9 +227,11 @@ type Proxier struct { nodeIP net.IP portMapper utilproxy.PortOpener recorder record.EventRecorder - healthChecker healthcheck.Server - healthzServer healthcheck.HealthzUpdater - ipvsScheduler string + + serviceHealthServer healthcheck.ServiceHealthServer + healthzServer healthcheck.ProxierHealthUpdater + + ipvsScheduler string // Added as a member to the struct to allow injection for testing. ipGetter IPGetter // The following buffers are used to reuse memory and avoid allocations @@ -328,7 +330,7 @@ func NewProxier(ipt utiliptables.Interface, hostname string, nodeIP net.IP, recorder record.EventRecorder, - healthzServer healthcheck.HealthzUpdater, + healthzServer healthcheck.ProxierHealthUpdater, scheduler string, nodePortAddresses []string, ) (*Proxier, error) { @@ -421,7 +423,7 @@ func NewProxier(ipt utiliptables.Interface, scheduler = DefaultScheduler } - healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) @@ -443,7 +445,7 @@ func NewProxier(ipt utiliptables.Interface, nodeIP: nodeIP, portMapper: &listenPortOpener{}, recorder: recorder, - healthChecker: healthChecker, + serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, ipvs: ipvs, ipvsScheduler: scheduler, @@ -489,7 +491,7 @@ func NewDualStackProxier( hostname string, nodeIP [2]net.IP, recorder record.EventRecorder, - healthzServer healthcheck.HealthzUpdater, + healthzServer healthcheck.ProxierHealthUpdater, scheduler string, nodePortAddresses []string, ) (proxy.Provider, error) { @@ -775,6 +777,9 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. func (proxier *Proxier) Sync() { + if proxier.healthzServer != nil { + proxier.healthzServer.QueuedUpdate() + } proxier.syncRunner.Run() } @@ -782,7 +787,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } proxier.syncRunner.Loop(wait.NeverStop) } @@ -807,7 +812,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) { // OnServiceUpdate is called whenever modification of an existing service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -839,7 +844,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -1489,19 +1494,18 @@ func (proxier *Proxier) syncProxyRules() { } proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) - // Update healthz timestamp if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() - // Update healthchecks. The endpoints list might include services that are - // not "OnlyLocal", but the services list will not, and the healthChecker + // Update service healthchecks. The endpoints list might include services that are + // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { klog.Errorf("Error syncing healthcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { klog.Errorf("Error syncing healthcheck endpoints: %v", err) } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 0bf1471b2b1..b6268eddd0a 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" @@ -59,18 +60,6 @@ func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) { return f.nodeIPs, nil } -type fakeHealthChecker struct { - services map[types.NamespacedName]uint16 - Endpoints map[types.NamespacedName]int -} - -func newFakeHealthChecker() *fakeHealthChecker { - return &fakeHealthChecker{ - services: map[types.NamespacedName]uint16{}, - Endpoints: map[types.NamespacedName]int{}, - } -} - // fakePortOpener implements portOpener. type fakePortOpener struct { openPorts []*utilproxy.LocalPort @@ -83,16 +72,6 @@ func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Close return nil, nil } -func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error { - fake.services = newServices - return nil -} - -func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { - fake.Endpoints = newEndpoints - return nil -} - // fakeKernelHandler implements KernelHandler. type fakeKernelHandler struct { modules []string @@ -151,7 +130,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u hostname: testHostname, portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, - healthChecker: newFakeHealthChecker(), + serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), ipvsScheduler: DefaultScheduler, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, iptablesData: bytes.NewBuffer(nil), diff --git a/pkg/proxy/winkernel/BUILD b/pkg/proxy/winkernel/BUILD index 127d854b88c..d78cffbc651 100644 --- a/pkg/proxy/winkernel/BUILD +++ b/pkg/proxy/winkernel/BUILD @@ -62,6 +62,7 @@ go_test( deps = select({ "@io_bazel_rules_go//go/platform:windows": [ "//pkg/proxy:go_default_library", + "//pkg/proxy/healthcheck:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 5ef3daf2de7..eb481a81e6b 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -471,8 +471,9 @@ type Proxier struct { hostname string nodeIP net.IP recorder record.EventRecorder - healthChecker healthcheck.Server - healthzServer healthcheck.HealthzUpdater + + serviceHealthServer healthcheck.ServiceHealthServer + healthzServer healthcheck.ProxierHealthUpdater // Since converting probabilities (floats) to strings is expensive // and we are using only probabilities in the format of 1/n, we are @@ -527,7 +528,7 @@ func NewProxier( hostname string, nodeIP net.IP, recorder record.EventRecorder, - healthzServer healthcheck.HealthzUpdater, + healthzServer healthcheck.ProxierHealthUpdater, config config.KubeProxyWinkernelConfiguration, ) (*Proxier, error) { masqueradeValue := 1 << uint(masqueradeBit) @@ -542,7 +543,7 @@ func NewProxier( klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } - healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) var hns HostNetworkService hns = hnsV1{} supportedFeatures := hcn.GetSupportedFeatures() @@ -622,24 +623,24 @@ func NewProxier( } proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - hns: hns, - network: *hnsNetworkInfo, - sourceVip: sourceVip, - hostMac: hostMac, - isDSR: isDSR, + portsMap: make(map[localPort]closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + recorder: recorder, + serviceHealthServer: serviceHealthServer, + healthzServer: healthzServer, + hns: hns, + network: *hnsNetworkInfo, + sourceVip: sourceVip, + hostMac: hostMac, + isDSR: isDSR, } burstSyncs := 2 @@ -725,6 +726,9 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) { // Sync is called to synchronize the proxier state to hns as soon as possible. func (proxier *Proxier) Sync() { + if proxier.healthzServer != nil { + proxier.healthzServer.QueuedUpdate() + } proxier.syncRunner.Run() } @@ -732,7 +736,7 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } proxier.syncRunner.Loop(wait.NeverStop) } @@ -752,21 +756,21 @@ func (proxier *Proxier) isInitialized() bool { func (proxier *Proxier) OnServiceAdd(service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnServiceDelete(service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -827,21 +831,21 @@ func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) { func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -1276,19 +1280,18 @@ func (proxier *Proxier) syncProxyRules() { Log(svcInfo, "+++Policy Successfully applied for service +++", 2) } - // Update healthz timestamp. if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() + proxier.healthzServer.Updated() } SyncProxyRulesLastTimestamp.SetToCurrentTime() - // Update healthchecks. The endpoints list might include services that are - // not "OnlyLocal", but the services list will not, and the healthChecker + // Update service healthchecks. The endpoints list might include services that are + // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { + if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.hcServices); err != nil { klog.Errorf("Error syncing healthcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { klog.Errorf("Error syncing healthcheck endpoints: %v", err) } diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index c55618b9ed1..1d2c09dc370 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -23,6 +23,7 @@ import ( discovery "k8s.io/api/discovery/v1alpha1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/healthcheck" "net" "strings" @@ -39,27 +40,6 @@ const destinationPrefix = "192.168.2.0/24" const providerAddress = "10.0.0.3" const guid = "123ABC" -type fakeHealthChecker struct { - services map[types.NamespacedName]uint16 - endpoints map[types.NamespacedName]int -} - -func newFakeHealthChecker() *fakeHealthChecker { - return &fakeHealthChecker{ - services: map[types.NamespacedName]uint16{}, - endpoints: map[types.NamespacedName]int{}, - } -} -func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error { - fake.services = newServices - return nil -} - -func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { - fake.endpoints = newEndpoints - return nil -} - type fakeHNS struct{} func newFakeHNS() *fakeHNS { @@ -126,20 +106,20 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust networkType: networkType, } proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - clusterCIDR: clusterCIDR, - hostname: testHostName, - nodeIP: nodeIP, - healthChecker: newFakeHealthChecker(), - network: *hnsNetworkInfo, - sourceVip: sourceVip, - hostMac: macAddress, - isDSR: false, - hns: newFakeHNS(), + portsMap: make(map[localPort]closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + clusterCIDR: clusterCIDR, + hostname: testHostName, + nodeIP: nodeIP, + serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), + network: *hnsNetworkInfo, + sourceVip: sourceVip, + hostMac: macAddress, + isDSR: false, + hns: newFakeHNS(), } return proxier }