Merge pull request #111661 from alexanderConstantinescu/etp-local-svc-hc-kube-proxy

[Proxy]: add `healthz` verification when determining HC response for eTP:Local
This commit is contained in:
Kubernetes Prow Robot 2023-03-07 05:34:36 -08:00 committed by GitHub
commit 86bf570711
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 34 deletions

View File

@ -121,7 +121,8 @@ type hcPayload struct {
Namespace string
Name string
}
LocalEndpoints int
LocalEndpoints int
ServiceProxyHealthy bool
}
type healthzPayload struct {
@ -129,12 +130,21 @@ type healthzPayload struct {
CurrentTime string
}
type fakeProxierHealthChecker struct {
healthy bool
}
func (fake fakeProxierHealthChecker) IsHealthy() bool {
return fake.healthy
}
func TestServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
nodePortAddresses := utilproxy.NewNodePortAddresses([]string{})
proxyChecker := &fakeProxierHealthChecker{true}
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses)
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
@ -351,9 +361,29 @@ func TestServer(t *testing.T) {
testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t)
testHandler(hcs, nsn3, http.StatusOK, 7, t)
testHandler(hcs, nsn4, http.StatusOK, 6, t)
// fake a temporary unhealthy proxy
proxyChecker.healthy = false
testHandlerWithHealth(hcs, nsn2, http.StatusServiceUnavailable, 0, false, t)
testHandlerWithHealth(hcs, nsn3, http.StatusServiceUnavailable, 7, false, t)
testHandlerWithHealth(hcs, nsn4, http.StatusServiceUnavailable, 6, false, t)
// fake a healthy proxy
proxyChecker.healthy = true
testHandlerWithHealth(hcs, nsn2, http.StatusServiceUnavailable, 0, true, t)
testHandlerWithHealth(hcs, nsn3, http.StatusOK, 7, true, t)
testHandlerWithHealth(hcs, nsn4, http.StatusOK, 6, true, t)
}
func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) {
tHandler(hcs, nsn, status, endpoints, true, t)
}
func testHandlerWithHealth(hcs *server, nsn types.NamespacedName, status int, endpoints int, kubeProxyHealthy bool, t *testing.T) {
tHandler(hcs, nsn, status, endpoints, kubeProxyHealthy, t)
}
func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, kubeProxyHealthy bool, t *testing.T) {
instance := hcs.services[nsn]
for _, h := range instance.httpServers {
handler := h.(*fakeHTTPServer).handler
@ -379,6 +409,9 @@ func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints in
if payload.LocalEndpoints != endpoints {
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
}
if payload.ServiceProxyHealthy != kubeProxyHealthy {
t.Errorf("expected %v kubeProxyHealthy, got %v", kubeProxyHealthy, payload.ServiceProxyHealthy)
}
}
}
@ -434,12 +467,13 @@ func testHealthzHandler(server httpServer, status int, t *testing.T) {
func TestServerWithSelectiveListeningAddress(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
proxyChecker := &fakeProxierHealthChecker{true}
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
nodePortAddresses := utilproxy.NewNodePortAddresses([]string{"127.0.0.0/8"})
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses)
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))

View File

@ -41,6 +41,8 @@ type ProxierHealthUpdater interface {
// Run starts the healthz HTTP server and blocks until it exits.
Run() error
proxierHealthChecker
}
var _ ProxierHealthUpdater = &proxierHealthServer{}
@ -94,6 +96,37 @@ func (hs *proxierHealthServer) QueuedUpdate() {
hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now())
}
// IsHealthy returns the proxier's health state, following the same definition
// the HTTP server defines.
func (hs *proxierHealthServer) IsHealthy() bool {
isHealthy, _, _ := hs.isHealthy()
return isHealthy
}
func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) {
var oldestPendingQueued, lastUpdated time.Time
if val := hs.oldestPendingQueued.Load(); val != nil {
oldestPendingQueued = val.(time.Time)
}
if val := hs.lastUpdated.Load(); val != nil {
lastUpdated = val.(time.Time)
}
currentTime := hs.clock.Now()
healthy := false
switch {
case oldestPendingQueued.IsZero():
// The proxy is healthy while it's starting up
// or the proxy is fully synced.
healthy = true
case currentTime.Sub(oldestPendingQueued) < hs.healthTimeout:
// There's an unprocessed update queued, but it's not late yet
healthy = true
}
return healthy, lastUpdated, currentTime
}
// Run starts the healthz HTTP server and blocks until it exits.
func (hs *proxierHealthServer) Run() error {
serveMux := http.NewServeMux()
@ -123,26 +156,7 @@ type healthzHandler struct {
}
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
var oldestPendingQueued, lastUpdated time.Time
if val := h.hs.oldestPendingQueued.Load(); val != nil {
oldestPendingQueued = val.(time.Time)
}
if val := h.hs.lastUpdated.Load(); val != nil {
lastUpdated = val.(time.Time)
}
currentTime := h.hs.clock.Now()
healthy := false
switch {
case oldestPendingQueued.IsZero():
// The proxy is healthy while it's starting up
// or the proxy is fully synced.
healthy = true
case currentTime.Sub(oldestPendingQueued) < h.hs.healthTimeout:
// There's an unprocessed update queued, but it's not late yet
healthy = true
}
healthy, lastUpdated, currentTime := h.hs.isHealthy()
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff")
if !healthy {

View File

@ -52,7 +52,13 @@ type ServiceHealthServer interface {
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
}
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *utilproxy.NodePortAddresses) ServiceHealthServer {
type proxierHealthChecker interface {
// IsHealthy returns the proxier's health state, following the same
// definition the HTTP server defines.
IsHealthy() bool
}
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *utilproxy.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
nodeAddresses, err := nodePortAddresses.GetNodeAddresses(utilproxy.RealNetwork{})
if err != nil || nodeAddresses.Len() == 0 {
@ -75,14 +81,15 @@ func newServiceHealthServer(hostname string, recorder events.EventRecorder, list
recorder: recorder,
listener: listener,
httpFactory: factory,
healthzServer: healthzServer,
services: map[types.NamespacedName]*hcInstance{},
nodeAddresses: nodeAddresses,
}
}
// NewServiceHealthServer allocates a new service healthcheck server manager
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *utilproxy.NodePortAddresses) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses)
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *utilproxy.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
}
type server struct {
@ -93,6 +100,8 @@ type server struct {
listener listener
httpFactory httpServerFactory
healthzServer proxierHealthChecker
lock sync.RWMutex
services map[types.NamespacedName]*hcInstance
}
@ -226,14 +235,15 @@ func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
return
}
count := svc.endpoints
kubeProxyHealthy := h.hcs.healthzServer.IsHealthy()
h.hcs.lock.RUnlock()
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff")
if count == 0 {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
if count != 0 && kubeProxyHealthy {
resp.WriteHeader(http.StatusOK)
} else {
resp.WriteHeader(http.StatusServiceUnavailable)
}
fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
{
@ -241,9 +251,10 @@ func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
"namespace": %q,
"name": %q
},
"localEndpoints": %d
"localEndpoints": %d,
"serviceProxyHealthy": %v
}
`, h.name.Namespace, h.name.Name, count)), "\n"))
`, h.name.Namespace, h.name.Name, count, kubeProxyHealthy)), "\n"))
}
func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {

View File

@ -268,7 +268,7 @@ func NewProxier(ipFamily v1.IPFamily,
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
proxier := &Proxier{
svcPortMap: make(proxy.ServicePortMap),

View File

@ -411,7 +411,7 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddresses := utilproxy.NewNodePortAddresses(nodePortAddressStrings)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
// excludeCIDRs has been validated before, here we just parse it to IPNet list
parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)

View File

@ -701,7 +701,7 @@ func NewProxier(
// windows listens to all node addresses
nodePortAddresses := utilproxy.NewNodePortAddresses(nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
hns, supportedFeatures := newHostNetworkService()
hnsNetworkName, err := getNetworkName(config.NetworkName)