k8s.io/apiserver: refactor GenericAPIServer healthz code.

Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
Siyuan Zhang 2023-11-15 12:36:04 -08:00
parent bb6c9ecb1a
commit 12c9bfc21d
5 changed files with 71 additions and 65 deletions

View File

@ -799,15 +799,14 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
preShutdownHooks: map[string]preShutdownHookEntry{},
disabledPostStartHooks: c.DisabledPostStartHooks,
healthzChecks: c.HealthzChecks,
livezChecks: c.LivezChecks,
readyzChecks: c.ReadyzChecks,
healthzRegistry: healthCheckRegistry{path: "/healthz", checks: c.HealthzChecks},
livezRegistry: healthCheckRegistry{path: "/livez", checks: c.LivezChecks, clock: clock.RealClock{}},
readyzRegistry: healthCheckRegistry{path: "/readyz", checks: c.ReadyzChecks},
livezGracePeriod: c.LivezGracePeriod,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
lifecycleSignals: c.lifecycleSignals,
ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,

View File

@ -60,7 +60,6 @@ import (
"k8s.io/kube-openapi/pkg/handler3"
openapiutil "k8s.io/kube-openapi/pkg/util"
"k8s.io/kube-openapi/pkg/validation/spec"
"k8s.io/utils/clock"
)
// Info about an API group.
@ -191,19 +190,11 @@ type GenericAPIServer struct {
preShutdownHooksCalled bool
// healthz checks
healthzLock sync.Mutex
healthzChecks []healthz.HealthChecker
healthzChecksInstalled bool
// livez checks
livezLock sync.Mutex
livezChecks []healthz.HealthChecker
livezChecksInstalled bool
// readyz checks
readyzLock sync.Mutex
readyzChecks []healthz.HealthChecker
readyzChecksInstalled bool
livezGracePeriod time.Duration
livezClock clock.Clock
healthzRegistry healthCheckRegistry
readyzRegistry healthCheckRegistry
livezRegistry healthCheckRegistry
livezGracePeriod time.Duration
// auditing. The backend is started before the server starts listening.
AuditBackend audit.Backend
@ -330,7 +321,7 @@ func (s *GenericAPIServer) PreShutdownHooks() map[string]preShutdownHookEntry {
return s.preShutdownHooks
}
func (s *GenericAPIServer) HealthzChecks() []healthz.HealthChecker {
return s.healthzChecks
return s.healthzRegistry.checks
}
func (s *GenericAPIServer) ListedPaths() []string {
return s.listedPathProvider.ListedPaths()

View File

@ -19,12 +19,60 @@ package server
import (
"fmt"
"net/http"
"sync"
"time"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/utils/clock"
)
// healthMux is an interface describing the methods InstallHandler requires.
type healthMux interface {
Handle(pattern string, handler http.Handler)
}
type healthCheckRegistry struct {
path string
lock sync.Mutex
checks []healthz.HealthChecker
checksInstalled bool
clock clock.Clock
}
func (reg *healthCheckRegistry) addHealthChecks(checks ...healthz.HealthChecker) error {
return reg.addDelayedHealthChecks(0, checks...)
}
func (reg *healthCheckRegistry) addDelayedHealthChecks(delay time.Duration, checks ...healthz.HealthChecker) error {
if delay > 0 && reg.clock == nil {
return fmt.Errorf("nil clock in healthCheckRegistry for %s endpoint", reg.path)
}
reg.lock.Lock()
defer reg.lock.Unlock()
if reg.checksInstalled {
return fmt.Errorf("unable to add because the %s endpoint has already been created", reg.path)
}
if delay > 0 {
for _, check := range checks {
reg.checks = append(reg.checks, delayedHealthCheck(check, reg.clock, delay))
}
} else {
reg.checks = append(reg.checks, checks...)
}
return nil
}
func (reg *healthCheckRegistry) installHandler(mux healthMux) {
reg.installHandlerWithHealthyFunc(mux, nil)
}
func (reg *healthCheckRegistry) installHandlerWithHealthyFunc(mux healthMux, firstTimeHealthy func()) {
reg.lock.Lock()
defer reg.lock.Unlock()
reg.checksInstalled = true
healthz.InstallPathHandlerWithHealthyFunc(mux, reg.path, firstTimeHealthy, reg.checks...)
}
// AddHealthChecks adds HealthCheck(s) to health endpoints (healthz, livez, readyz) but
// configures the liveness grace period to be zero, which means we expect this health check
// to immediately indicate that the apiserver is unhealthy.
@ -47,40 +95,23 @@ func (s *GenericAPIServer) AddBootSequenceHealthChecks(checks ...healthz.HealthC
// addHealthChecks adds health checks to healthz, livez, and readyz. The delay passed in will set
// a corresponding grace period on livez.
func (s *GenericAPIServer) addHealthChecks(livezGracePeriod time.Duration, checks ...healthz.HealthChecker) error {
s.healthzLock.Lock()
defer s.healthzLock.Unlock()
if s.healthzChecksInstalled {
return fmt.Errorf("unable to add because the healthz endpoint has already been created")
}
s.healthzChecks = append(s.healthzChecks, checks...)
if err := s.AddLivezChecks(livezGracePeriod, checks...); err != nil {
if err := s.healthzRegistry.addHealthChecks(checks...); err != nil {
return err
}
return s.AddReadyzChecks(checks...)
if err := s.livezRegistry.addDelayedHealthChecks(livezGracePeriod, checks...); err != nil {
return err
}
return s.readyzRegistry.addHealthChecks(checks...)
}
// AddReadyzChecks allows you to add a HealthCheck to readyz.
func (s *GenericAPIServer) AddReadyzChecks(checks ...healthz.HealthChecker) error {
s.readyzLock.Lock()
defer s.readyzLock.Unlock()
if s.readyzChecksInstalled {
return fmt.Errorf("unable to add because the readyz endpoint has already been created")
}
s.readyzChecks = append(s.readyzChecks, checks...)
return nil
return s.readyzRegistry.addHealthChecks(checks...)
}
// AddLivezChecks allows you to add a HealthCheck to livez.
func (s *GenericAPIServer) AddLivezChecks(delay time.Duration, checks ...healthz.HealthChecker) error {
s.livezLock.Lock()
defer s.livezLock.Unlock()
if s.livezChecksInstalled {
return fmt.Errorf("unable to add because the livez endpoint has already been created")
}
for _, check := range checks {
s.livezChecks = append(s.livezChecks, delayedHealthCheck(check, s.livezClock, delay))
}
return nil
return s.livezRegistry.addDelayedHealthChecks(delay, checks...)
}
// addReadyzShutdownCheck is a convenience function for adding a readyz shutdown check, so
@ -92,29 +123,20 @@ func (s *GenericAPIServer) addReadyzShutdownCheck(stopCh <-chan struct{}) error
// installHealthz creates the healthz endpoint for this server
func (s *GenericAPIServer) installHealthz() {
s.healthzLock.Lock()
defer s.healthzLock.Unlock()
s.healthzChecksInstalled = true
healthz.InstallHandler(s.Handler.NonGoRestfulMux, s.healthzChecks...)
s.healthzRegistry.installHandler(s.Handler.NonGoRestfulMux)
}
// installReadyz creates the readyz endpoint for this server.
func (s *GenericAPIServer) installReadyz() {
s.readyzLock.Lock()
defer s.readyzLock.Unlock()
s.readyzChecksInstalled = true
healthz.InstallReadyzHandlerWithHealthyFunc(s.Handler.NonGoRestfulMux, func() {
// note: InstallReadyzHandlerWithHealthyFunc guarantees that this is called only once
s.readyzRegistry.installHandlerWithHealthyFunc(s.Handler.NonGoRestfulMux, func() {
// note: installHandlerWithHealthyFunc guarantees that this is called only once
s.lifecycleSignals.HasBeenReady.Signal()
}, s.readyzChecks...)
})
}
// installLivez creates the livez endpoint for this server.
func (s *GenericAPIServer) installLivez() {
s.livezLock.Lock()
defer s.livezLock.Unlock()
s.livezChecksInstalled = true
healthz.InstallLivezHandler(s.Handler.NonGoRestfulMux, s.livezChecks...)
s.livezRegistry.installHandler(s.Handler.NonGoRestfulMux)
}
// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences

View File

@ -142,12 +142,6 @@ func InstallReadyzHandler(mux mux, checks ...HealthChecker) {
InstallPathHandler(mux, "/readyz", checks...)
}
// InstallReadyzHandlerWithHealthyFunc is like InstallReadyzHandler, but in addition call firstTimeReady
// the first time /readyz succeeds.
func InstallReadyzHandlerWithHealthyFunc(mux mux, firstTimeReady func(), checks ...HealthChecker) {
InstallPathHandlerWithHealthyFunc(mux, "/readyz", firstTimeReady, checks...)
}
// InstallLivezHandler registers handlers for liveness checking on the path
// "/livez" to mux. *All handlers* for mux must be specified in
// exactly one call to InstallHandler. Calling InstallHandler more

View File

@ -313,7 +313,7 @@ func (s cacheSyncWaiterStub) WaitForCacheSync(_ <-chan struct{}) map[reflect.Typ
return s.startedByInformerType
}
func TestInstallReadyzHandlerWithHealthyFunc(t *testing.T) {
func TestInstallPathHandlerWithHealthyFunc(t *testing.T) {
mux := http.NewServeMux()
readyzCh := make(chan struct{})
@ -321,7 +321,7 @@ func TestInstallReadyzHandlerWithHealthyFunc(t *testing.T) {
hasBeenReadyFn := func() {
hasBeenReadyCounter++
}
InstallReadyzHandlerWithHealthyFunc(mux, hasBeenReadyFn, readyOnChanClose{readyzCh})
InstallPathHandlerWithHealthyFunc(mux, "/readyz", hasBeenReadyFn, readyOnChanClose{readyzCh})
// scenario 1: expect the check to fail since the channel hasn't been closed
req, err := http.NewRequest("GET", fmt.Sprintf("http://example.com%s", "/readyz"), nil)