diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index f756cb85b74..7609c2e2c3d 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -58,6 +58,15 @@ type fakeNetListener struct { addr string } +type fakeAddr struct { +} + +func (fa fakeAddr) Network() string { + return "tcp" +} +func (fa fakeAddr) String() string { + return "" +} func (fake *fakeNetListener) Accept() (net.Conn, error) { // Not implemented return nil, nil @@ -70,7 +79,7 @@ func (fake *fakeNetListener) Close() error { func (fake *fakeNetListener) Addr() net.Addr { // Not implemented - return nil + return fakeAddr{} } type fakeHTTPServerFactory struct{} @@ -119,7 +128,7 @@ func TestServer(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() - hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory) + hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{}) hcs := hcsi.(*server) if len(hcs.services) != 0 { t.Errorf("expected 0 services, got %d", len(hcs.services)) @@ -339,27 +348,31 @@ func TestServer(t *testing.T) { } func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) { - handler := hcs.services[nsn].server.(*fakeHTTPServer).handler - req, err := http.NewRequest("GET", "/healthz", nil) - if err != nil { - t.Fatal(err) - } - resp := httptest.NewRecorder() + instance := hcs.services[nsn] + for _, h := range instance.httpServers { + handler := h.(*fakeHTTPServer).handler - handler.ServeHTTP(resp, req) + req, err := http.NewRequest("GET", "/healthz", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() - if resp.Code != status { - t.Errorf("expected status code %v, got %v", status, resp.Code) - } - var payload hcPayload - if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { - t.Fatal(err) - } - if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace { - t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service) - } - if payload.LocalEndpoints != endpoints { - t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) + handler.ServeHTTP(resp, req) + + if resp.Code != status { + t.Errorf("expected status code %v, got %v", status, resp.Code) + } + var payload hcPayload + if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { + t.Fatal(err) + } + if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace { + t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service) + } + if payload.LocalEndpoints != endpoints { + t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) + } } } @@ -411,3 +424,51 @@ func testHealthzHandler(server httpServer, status int, t *testing.T) { t.Fatal(err) } } + +func TestServerWithSelectiveListeningAddress(t *testing.T) { + listener := newFakeListener() + httpFactory := newFakeHTTPServerFactory() + + // limiting addresses to loop back. We don't want any cleverness here around getting IP for + // machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine + + hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{"127.0.0.0/8"}) + hcs := hcsi.(*server) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync nothing + hcs.SyncServices(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + hcs.SyncEndpoints(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync unknown endpoints, should be dropped + hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93}) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync a real service + nsn := mknsn("a", "b") + hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + if len(listener.openPorts) != 1 { + t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts)) + } + if !listener.hasPort("127.0.0.1:9376") { + t.Errorf("expected port :9376 to be open\n%s", spew.Sdump(listener.openPorts)) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) +} diff --git a/pkg/proxy/healthcheck/service_health.go b/pkg/proxy/healthcheck/service_health.go index 62da1ef1e30..daac5b019d3 100644 --- a/pkg/proxy/healthcheck/service_health.go +++ b/pkg/proxy/healthcheck/service_health.go @@ -30,6 +30,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/events" api "k8s.io/kubernetes/pkg/apis/core" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" ) // ServiceHealthServer serves HTTP endpoints for each service name, with results @@ -48,26 +52,46 @@ type ServiceHealthServer interface { SyncEndpoints(newEndpoints map[types.NamespacedName]int) error } -func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer { +func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses []string) ServiceHealthServer { + + nodeAddresses, err := utilproxy.GetNodeAddresses(nodePortAddresses, utilproxy.RealNetwork{}) + if err != nil || nodeAddresses.Len() == 0 { + klog.ErrorS(err, "Health Check Port:Failed to get node ip address matching node port addresses. Health check port will listen to all node addresses", nodePortAddresses) + nodeAddresses = sets.NewString() + nodeAddresses.Insert(utilproxy.IPv4ZeroCIDR) + } + + // if any of the addresses is zero cidr then we listen + // to old style : + for _, addr := range nodeAddresses.List() { + if utilproxy.IsZeroCIDR(addr) { + nodeAddresses = sets.NewString("") + break + } + } + return &server{ - hostname: hostname, - recorder: recorder, - listener: listener, - httpFactory: factory, - services: map[types.NamespacedName]*hcInstance{}, + hostname: hostname, + recorder: recorder, + listener: listener, + httpFactory: factory, + services: map[types.NamespacedName]*hcInstance{}, + nodeAddresses: nodeAddresses, } } // NewServiceHealthServer allocates a new service healthcheck server manager -func NewServiceHealthServer(hostname string, recorder events.EventRecorder) ServiceHealthServer { - return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}) +func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses []string) ServiceHealthServer { + return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses) } type server struct { - hostname string - recorder events.EventRecorder // can be nil - listener listener - httpFactory httpServerFactory + hostname string + // node addresses where health check port will listen on + nodeAddresses sets.String + recorder events.EventRecorder // can be nil + listener listener + httpFactory httpServerFactory lock sync.RWMutex services map[types.NamespacedName]*hcInstance @@ -80,10 +104,11 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err // Remove any that are not needed any more. for nsn, svc := range hcs.services { if port, found := newServices[nsn]; !found || port != svc.port { - klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port) - if err := svc.listener.Close(); err != nil { - klog.Errorf("Close(%v): %v", svc.listener.Addr(), err) - } + klog.V(2).Infof("Closing healthcheck %v on port %d", nsn.String(), svc.port) + + // errors are loged in closeAll() + _ = svc.closeAll() + delete(hcs.services, nsn) } } @@ -95,12 +120,11 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err continue } - klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port) - svc := &hcInstance{port: port} - addr := fmt.Sprintf(":%d", port) - svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs}) - var err error - svc.listener, err = hcs.listener.Listen(addr) + klog.V(2).Infof("Opening healthcheck %s on port %v", nsn.String(), port) + + svc := &hcInstance{nsn: nsn, port: port} + err := svc.listenAndServeAll(hcs) + if err != nil { msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) @@ -117,27 +141,77 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err continue } hcs.services[nsn] = svc - - go func(nsn types.NamespacedName, svc *hcInstance) { - // Serve() will exit when the listener is closed. - klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port) - if err := svc.server.Serve(svc.listener); err != nil { - klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err) - return - } - klog.V(3).Infof("Healthcheck %q closed", nsn.String()) - }(nsn, svc) } return nil } type hcInstance struct { - port uint16 - listener net.Listener - server httpServer + nsn types.NamespacedName + port uint16 + + listeners []net.Listener + httpServers []httpServer + endpoints int // number of local endpoints for a service } +// listenAll opens health check port on all the addresses provided +func (hcI *hcInstance) listenAndServeAll(hcs *server) error { + var err error + var listener net.Listener + + addresses := hcs.nodeAddresses.List() + hcI.listeners = make([]net.Listener, 0, len(addresses)) + hcI.httpServers = make([]httpServer, 0, len(addresses)) + + // for each of the node addresses start listening and serving + for _, address := range addresses { + addr := net.JoinHostPort(address, fmt.Sprint(hcI.port)) + // create http server + httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs}) + // start listener + listener, err = hcs.listener.Listen(addr) + if err != nil { + // must close whatever have been previously opened + // to allow a retry/or port ownership change as needed + _ = hcI.closeAll() + return err + } + + // start serving + go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) { + // Serve() will exit when the listener is closed. + klog.V(3).Infof("Starting goroutine for healthcheck %q on %s", hcI.nsn.String(), listener.Addr().String()) + if err := httpSrv.Serve(listener); err != nil { + klog.V(3).Infof("Healthcheck %q closed: %v", hcI.nsn.String(), err) + return + } + klog.V(3).Infof("Healthcheck %q on %s closed", hcI.nsn.String(), listener.Addr().String()) + }(hcI, listener, httpSrv) + + hcI.listeners = append(hcI.listeners, listener) + hcI.httpServers = append(hcI.httpServers, httpSrv) + } + + return nil +} + +func (hcI *hcInstance) closeAll() error { + errors := []error{} + for _, listener := range hcI.listeners { + if err := listener.Close(); err != nil { + klog.Errorf("Service %q -- CloseListener(%v) error:%v", hcI.nsn, listener.Addr(), err) + errors = append(errors, err) + } + } + + if len(errors) > 0 { + return utilerrors.NewAggregate(errors) + } + + return nil +} + type hcHandler struct { name types.NamespacedName hcs *server diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index c49f42f89b1..99b2e01f23b 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -280,7 +280,7 @@ func NewProxier(ipt utiliptables.Interface, masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark) - serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) ipFamily := v1.IPv4Protocol if ipt.IsIPv6() { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 630a8dadeb7..4fcb362ae48 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -441,7 +441,7 @@ func NewProxier(ipt utiliptables.Interface, scheduler = DefaultScheduler } - serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) nodePortAddresses = ipFamilyMap[ipFamily] diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index a5aa6b65f76..9cb9e8716c1 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -642,7 +642,7 @@ func NewProxier( klog.InfoS("clusterCIDR not specified, unable to distinguish between internal and external traffic") } - serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) + serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, []string{} /* windows listen to all node addresses */) hns, supportedFeatures := newHostNetworkService() hnsNetworkName, err := getNetworkName(config.NetworkName) if err != nil {