change health checkport to listen to node port addresses

This commit is contained in:
Khaled (Kal) Henidak 2021-09-02 23:24:33 +00:00
parent dcfe8f5d5c
commit 784c31cca1
2 changed files with 191 additions and 56 deletions

View File

@ -58,6 +58,15 @@ type fakeNetListener struct {
addr string
}
type fakeAddr struct {
}
func (fa fakeAddr) Network() string {
return "tcp"
}
func (fa fakeAddr) String() string {
return "<test>"
}
func (fake *fakeNetListener) Accept() (net.Conn, error) {
// Not implemented
return nil, nil
@ -70,7 +79,7 @@ func (fake *fakeNetListener) Close() error {
func (fake *fakeNetListener) Addr() net.Addr {
// Not implemented
return nil
return fakeAddr{}
}
type fakeHTTPServerFactory struct{}
@ -119,7 +128,7 @@ func TestServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory)
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{})
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
@ -339,27 +348,31 @@ func TestServer(t *testing.T) {
}
func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) {
handler := hcs.services[nsn].server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()
instance := hcs.services[nsn]
for _, h := range instance.httpServers {
handler := h.(*fakeHTTPServer).handler
handler.ServeHTTP(resp, req)
req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()
if resp.Code != status {
t.Errorf("expected status code %v, got %v", status, resp.Code)
}
var payload hcPayload
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
t.Fatal(err)
}
if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace {
t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service)
}
if payload.LocalEndpoints != endpoints {
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
handler.ServeHTTP(resp, req)
if resp.Code != status {
t.Errorf("expected status code %v, got %v", status, resp.Code)
}
var payload hcPayload
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
t.Fatal(err)
}
if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace {
t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service)
}
if payload.LocalEndpoints != endpoints {
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
}
}
}
@ -411,3 +424,51 @@ func testHealthzHandler(server httpServer, status int, t *testing.T) {
t.Fatal(err)
}
}
func TestServerWithSelectiveListeningAddress(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{"127.0.0.0/8"})
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync nothing
hcs.SyncServices(nil)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
hcs.SyncEndpoints(nil)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync unknown endpoints, should be dropped
hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93})
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync a real service
nsn := mknsn("a", "b")
hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376})
if len(hcs.services) != 1 {
t.Errorf("expected 1 service, got %d", len(hcs.services))
}
if hcs.services[nsn].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
}
if len(listener.openPorts) != 1 {
t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts))
}
if !listener.hasPort("127.0.0.1:9376") {
t.Errorf("expected port :9376 to be open\n%s", spew.Sdump(listener.openPorts))
}
// test the handler
testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
}

View File

@ -30,6 +30,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/events"
api "k8s.io/kubernetes/pkg/apis/core"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
)
// ServiceHealthServer serves HTTP endpoints for each service name, with results
@ -48,26 +52,46 @@ type ServiceHealthServer interface {
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
}
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer {
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses []string) ServiceHealthServer {
nodeAddresses, err := utilproxy.GetNodeAddresses(nodePortAddresses, utilproxy.RealNetwork{})
if err != nil || nodeAddresses.Len() == 0 {
klog.ErrorS(err, "Health Check Port:Failed to get node ip address matching node port addresses. Health check port will listen to all node addresses", nodePortAddresses)
nodeAddresses = sets.NewString()
nodeAddresses.Insert(utilproxy.IPv4ZeroCIDR)
}
// if any of the addresses is zero cidr then we listen
// to old style :<port>
for _, addr := range nodeAddresses.List() {
if utilproxy.IsZeroCIDR(addr) {
nodeAddresses = sets.NewString("")
break
}
}
return &server{
hostname: hostname,
recorder: recorder,
listener: listener,
httpFactory: factory,
services: map[types.NamespacedName]*hcInstance{},
hostname: hostname,
recorder: recorder,
listener: listener,
httpFactory: factory,
services: map[types.NamespacedName]*hcInstance{},
nodeAddresses: nodeAddresses,
}
}
// NewServiceHealthServer allocates a new service healthcheck server manager
func NewServiceHealthServer(hostname string, recorder events.EventRecorder) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{})
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses []string) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses)
}
type server struct {
hostname string
recorder events.EventRecorder // can be nil
listener listener
httpFactory httpServerFactory
hostname string
// node addresses where health check port will listen on
nodeAddresses sets.String
recorder events.EventRecorder // can be nil
listener listener
httpFactory httpServerFactory
lock sync.RWMutex
services map[types.NamespacedName]*hcInstance
@ -80,10 +104,11 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
// Remove any that are not needed any more.
for nsn, svc := range hcs.services {
if port, found := newServices[nsn]; !found || port != svc.port {
klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.listener.Close(); err != nil {
klog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
}
klog.V(2).Infof("Closing healthcheck %v on port %d", nsn.String(), svc.port)
// errors are loged in closeAll()
_ = svc.closeAll()
delete(hcs.services, nsn)
}
}
@ -95,12 +120,11 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
continue
}
klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
svc := &hcInstance{port: port}
addr := fmt.Sprintf(":%d", port)
svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
var err error
svc.listener, err = hcs.listener.Listen(addr)
klog.V(2).Infof("Opening healthcheck %s on port %v", nsn.String(), port)
svc := &hcInstance{nsn: nsn, port: port}
err := svc.listenAndServeAll(hcs)
if err != nil {
msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
@ -117,27 +141,77 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
continue
}
hcs.services[nsn] = svc
go func(nsn types.NamespacedName, svc *hcInstance) {
// Serve() will exit when the listener is closed.
klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.server.Serve(svc.listener); err != nil {
klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
return
}
klog.V(3).Infof("Healthcheck %q closed", nsn.String())
}(nsn, svc)
}
return nil
}
type hcInstance struct {
port uint16
listener net.Listener
server httpServer
nsn types.NamespacedName
port uint16
listeners []net.Listener
httpServers []httpServer
endpoints int // number of local endpoints for a service
}
// listenAll opens health check port on all the addresses provided
func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
var err error
var listener net.Listener
addresses := hcs.nodeAddresses.List()
hcI.listeners = make([]net.Listener, 0, len(addresses))
hcI.httpServers = make([]httpServer, 0, len(addresses))
// for each of the node addresses start listening and serving
for _, address := range addresses {
addr := net.JoinHostPort(address, fmt.Sprint(hcI.port))
// create http server
httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
// start listener
listener, err = hcs.listener.Listen(addr)
if err != nil {
// must close whatever have been previously opened
// to allow a retry/or port ownership change as needed
_ = hcI.closeAll()
return err
}
// start serving
go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) {
// Serve() will exit when the listener is closed.
klog.V(3).Infof("Starting goroutine for healthcheck %q on %s", hcI.nsn.String(), listener.Addr().String())
if err := httpSrv.Serve(listener); err != nil {
klog.V(3).Infof("Healthcheck %q closed: %v", hcI.nsn.String(), err)
return
}
klog.V(3).Infof("Healthcheck %q on %s closed", hcI.nsn.String(), listener.Addr().String())
}(hcI, listener, httpSrv)
hcI.listeners = append(hcI.listeners, listener)
hcI.httpServers = append(hcI.httpServers, httpSrv)
}
return nil
}
func (hcI *hcInstance) closeAll() error {
errors := []error{}
for _, listener := range hcI.listeners {
if err := listener.Close(); err != nil {
klog.Errorf("Service %q -- CloseListener(%v) error:%v", hcI.nsn, listener.Addr(), err)
errors = append(errors, err)
}
}
if len(errors) > 0 {
return utilerrors.NewAggregate(errors)
}
return nil
}
type hcHandler struct {
name types.NamespacedName
hcs *server