Merge pull request #104742 from khenidak/health-check-port

change health-check port to listen to node port addresses
This commit is contained in:
Kubernetes Prow Robot 2021-09-13 15:43:52 -07:00 committed by GitHub
commit 648559b63e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 194 additions and 59 deletions

View File

@ -58,6 +58,15 @@ type fakeNetListener struct {
addr string addr string
} }
type fakeAddr struct {
}
func (fa fakeAddr) Network() string {
return "tcp"
}
func (fa fakeAddr) String() string {
return "<test>"
}
func (fake *fakeNetListener) Accept() (net.Conn, error) { func (fake *fakeNetListener) Accept() (net.Conn, error) {
// Not implemented // Not implemented
return nil, nil return nil, nil
@ -70,7 +79,7 @@ func (fake *fakeNetListener) Close() error {
func (fake *fakeNetListener) Addr() net.Addr { func (fake *fakeNetListener) Addr() net.Addr {
// Not implemented // Not implemented
return nil return fakeAddr{}
} }
type fakeHTTPServerFactory struct{} type fakeHTTPServerFactory struct{}
@ -119,7 +128,7 @@ func TestServer(t *testing.T) {
listener := newFakeListener() listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory() httpFactory := newFakeHTTPServerFactory()
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory) hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{})
hcs := hcsi.(*server) hcs := hcsi.(*server)
if len(hcs.services) != 0 { if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services)) t.Errorf("expected 0 services, got %d", len(hcs.services))
@ -339,27 +348,31 @@ func TestServer(t *testing.T) {
} }
func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) { func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) {
handler := hcs.services[nsn].server.(*fakeHTTPServer).handler instance := hcs.services[nsn]
req, err := http.NewRequest("GET", "/healthz", nil) for _, h := range instance.httpServers {
if err != nil { handler := h.(*fakeHTTPServer).handler
t.Fatal(err)
}
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req) req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()
if resp.Code != status { handler.ServeHTTP(resp, req)
t.Errorf("expected status code %v, got %v", status, resp.Code)
} if resp.Code != status {
var payload hcPayload t.Errorf("expected status code %v, got %v", status, resp.Code)
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { }
t.Fatal(err) var payload hcPayload
} if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace { t.Fatal(err)
t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service) }
} if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace {
if payload.LocalEndpoints != endpoints { t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service)
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) }
if payload.LocalEndpoints != endpoints {
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
}
} }
} }
@ -411,3 +424,51 @@ func testHealthzHandler(server httpServer, status int, t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestServerWithSelectiveListeningAddress(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{"127.0.0.0/8"})
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync nothing
hcs.SyncServices(nil)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
hcs.SyncEndpoints(nil)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync unknown endpoints, should be dropped
hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93})
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync a real service
nsn := mknsn("a", "b")
hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376})
if len(hcs.services) != 1 {
t.Errorf("expected 1 service, got %d", len(hcs.services))
}
if hcs.services[nsn].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
}
if len(listener.openPorts) != 1 {
t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts))
}
if !listener.hasPort("127.0.0.1:9376") {
t.Errorf("expected port :9376 to be open\n%s", spew.Sdump(listener.openPorts))
}
// test the handler
testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
}

View File

@ -30,6 +30,10 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
) )
// ServiceHealthServer serves HTTP endpoints for each service name, with results // ServiceHealthServer serves HTTP endpoints for each service name, with results
@ -48,26 +52,46 @@ type ServiceHealthServer interface {
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
} }
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer { func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses []string) ServiceHealthServer {
nodeAddresses, err := utilproxy.GetNodeAddresses(nodePortAddresses, utilproxy.RealNetwork{})
if err != nil || nodeAddresses.Len() == 0 {
klog.ErrorS(err, "Health Check Port:Failed to get node ip address matching node port addresses. Health check port will listen to all node addresses", nodePortAddresses)
nodeAddresses = sets.NewString()
nodeAddresses.Insert(utilproxy.IPv4ZeroCIDR)
}
// if any of the addresses is zero cidr then we listen
// to old style :<port>
for _, addr := range nodeAddresses.List() {
if utilproxy.IsZeroCIDR(addr) {
nodeAddresses = sets.NewString("")
break
}
}
return &server{ return &server{
hostname: hostname, hostname: hostname,
recorder: recorder, recorder: recorder,
listener: listener, listener: listener,
httpFactory: factory, httpFactory: factory,
services: map[types.NamespacedName]*hcInstance{}, services: map[types.NamespacedName]*hcInstance{},
nodeAddresses: nodeAddresses,
} }
} }
// NewServiceHealthServer allocates a new service healthcheck server manager // NewServiceHealthServer allocates a new service healthcheck server manager
func NewServiceHealthServer(hostname string, recorder events.EventRecorder) ServiceHealthServer { func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses []string) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}) return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses)
} }
type server struct { type server struct {
hostname string hostname string
recorder events.EventRecorder // can be nil // node addresses where health check port will listen on
listener listener nodeAddresses sets.String
httpFactory httpServerFactory recorder events.EventRecorder // can be nil
listener listener
httpFactory httpServerFactory
lock sync.RWMutex lock sync.RWMutex
services map[types.NamespacedName]*hcInstance services map[types.NamespacedName]*hcInstance
@ -80,10 +104,11 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
// Remove any that are not needed any more. // Remove any that are not needed any more.
for nsn, svc := range hcs.services { for nsn, svc := range hcs.services {
if port, found := newServices[nsn]; !found || port != svc.port { if port, found := newServices[nsn]; !found || port != svc.port {
klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port) klog.V(2).Infof("Closing healthcheck %v on port %d", nsn.String(), svc.port)
if err := svc.listener.Close(); err != nil {
klog.Errorf("Close(%v): %v", svc.listener.Addr(), err) // errors are loged in closeAll()
} _ = svc.closeAll()
delete(hcs.services, nsn) delete(hcs.services, nsn)
} }
} }
@ -95,12 +120,11 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
continue continue
} }
klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port) klog.V(2).Infof("Opening healthcheck %s on port %v", nsn.String(), port)
svc := &hcInstance{port: port}
addr := fmt.Sprintf(":%d", port) svc := &hcInstance{nsn: nsn, port: port}
svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs}) err := svc.listenAndServeAll(hcs)
var err error
svc.listener, err = hcs.listener.Listen(addr)
if err != nil { if err != nil {
msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
@ -117,27 +141,77 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
continue continue
} }
hcs.services[nsn] = svc hcs.services[nsn] = svc
go func(nsn types.NamespacedName, svc *hcInstance) {
// Serve() will exit when the listener is closed.
klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.server.Serve(svc.listener); err != nil {
klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
return
}
klog.V(3).Infof("Healthcheck %q closed", nsn.String())
}(nsn, svc)
} }
return nil return nil
} }
type hcInstance struct { type hcInstance struct {
port uint16 nsn types.NamespacedName
listener net.Listener port uint16
server httpServer
listeners []net.Listener
httpServers []httpServer
endpoints int // number of local endpoints for a service endpoints int // number of local endpoints for a service
} }
// listenAll opens health check port on all the addresses provided
func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
var err error
var listener net.Listener
addresses := hcs.nodeAddresses.List()
hcI.listeners = make([]net.Listener, 0, len(addresses))
hcI.httpServers = make([]httpServer, 0, len(addresses))
// for each of the node addresses start listening and serving
for _, address := range addresses {
addr := net.JoinHostPort(address, fmt.Sprint(hcI.port))
// create http server
httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
// start listener
listener, err = hcs.listener.Listen(addr)
if err != nil {
// must close whatever have been previously opened
// to allow a retry/or port ownership change as needed
_ = hcI.closeAll()
return err
}
// start serving
go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) {
// Serve() will exit when the listener is closed.
klog.V(3).Infof("Starting goroutine for healthcheck %q on %s", hcI.nsn.String(), listener.Addr().String())
if err := httpSrv.Serve(listener); err != nil {
klog.V(3).Infof("Healthcheck %q closed: %v", hcI.nsn.String(), err)
return
}
klog.V(3).Infof("Healthcheck %q on %s closed", hcI.nsn.String(), listener.Addr().String())
}(hcI, listener, httpSrv)
hcI.listeners = append(hcI.listeners, listener)
hcI.httpServers = append(hcI.httpServers, httpSrv)
}
return nil
}
func (hcI *hcInstance) closeAll() error {
errors := []error{}
for _, listener := range hcI.listeners {
if err := listener.Close(); err != nil {
klog.Errorf("Service %q -- CloseListener(%v) error:%v", hcI.nsn, listener.Addr(), err)
errors = append(errors, err)
}
}
if len(errors) > 0 {
return utilerrors.NewAggregate(errors)
}
return nil
}
type hcHandler struct { type hcHandler struct {
name types.NamespacedName name types.NamespacedName
hcs *server hcs *server

View File

@ -280,7 +280,7 @@ func NewProxier(ipt utiliptables.Interface,
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark) klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
ipFamily := v1.IPv4Protocol ipFamily := v1.IPv4Protocol
if ipt.IsIPv6() { if ipt.IsIPv6() {

View File

@ -441,7 +441,7 @@ func NewProxier(ipt utiliptables.Interface,
scheduler = DefaultScheduler scheduler = DefaultScheduler
} }
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
nodePortAddresses = ipFamilyMap[ipFamily] nodePortAddresses = ipFamilyMap[ipFamily]

View File

@ -642,7 +642,7 @@ func NewProxier(
klog.InfoS("clusterCIDR not specified, unable to distinguish between internal and external traffic") klog.InfoS("clusterCIDR not specified, unable to distinguish between internal and external traffic")
} }
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, []string{} /* windows listen to all node addresses */)
hns, supportedFeatures := newHostNetworkService() hns, supportedFeatures := newHostNetworkService()
hnsNetworkName, err := getNetworkName(config.NetworkName) hnsNetworkName, err := getNetworkName(config.NetworkName)
if err != nil { if err != nil {