diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index babeec5ab72..f8d69c0e49e 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -28,6 +28,7 @@ import ( // TODO: Migrate kubelet to either use its own internal objects or client library. v1 "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/server/healthz" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -122,6 +123,10 @@ type ContainerManager interface { // registration. GetPluginRegistrationHandlers() map[string]cache.PluginHandler + // GetHealthCheckers returns a set of health checkers for all plugins. + // These checkers are integrated into the systemd watchdog to monitor the service's health. + GetHealthCheckers() []healthz.HealthChecker + // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // due to node recreation. ShouldResetExtendedResourceCapacity() bool diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 5cad2f75a7e..9d335791f4c 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -41,6 +41,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server/healthz" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -661,6 +662,10 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache return res } +func (cm *containerManagerImpl) GetHealthCheckers() []healthz.HealthChecker { + return []healthz.HealthChecker{cm.deviceManager.GetHealthChecker()} +} + // TODO: move the GetResources logic to PodContainerManager. func (cm *containerManagerImpl) GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { logger := klog.FromContext(ctx) diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index e271366edf8..e117a7377d7 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/server/healthz" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" @@ -95,6 +96,10 @@ func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache return nil } +func (cm *containerManagerStub) GetHealthCheckers() []healthz.HealthChecker { + return []healthz.HealthChecker{} +} + func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { return cm.extendedPluginResources, cm.extendedPluginResources, []string{} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index c518f73de7d..5cbbc300218 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -36,6 +36,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/server/healthz" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" internalapi "k8s.io/cri-api/pkg/apis" @@ -224,6 +225,10 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()} } +func (cm *containerManagerImpl) GetHealthCheckers() []healthz.HealthChecker { + return []healthz.HealthChecker{cm.deviceManager.GetHealthChecker()} +} + func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { return cm.deviceManager.GetCapacity() } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index c6ccb4e7f83..e13b03931da 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" errorsutil "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/server/healthz" utilfeature "k8s.io/apiserver/pkg/util/feature" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/features" @@ -326,6 +327,11 @@ func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { return m.server } +// GetHealthChecker returns the plugin handler +func (m *ManagerImpl) GetHealthChecker() healthz.HealthChecker { + return m.server +} + // checkpointFile returns device plugin checkpoint file path. func (m *ManagerImpl) checkpointFile() string { return filepath.Join(m.checkpointdir, kubeletDeviceManagerCheckpoint) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go index 6aa6936bab9..312aa930a0f 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "net/http" "os" "path/filepath" "sync" @@ -28,6 +29,7 @@ import ( "google.golang.org/grpc" core "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/klog/v2" api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -39,6 +41,7 @@ import ( // Server interface provides methods for Device plugin registration server. type Server interface { cache.PluginHandler + healthz.HealthChecker Start() error Stop() error SocketPath() string @@ -53,6 +56,9 @@ type server struct { rhandler RegistrationHandler chandler ClientHandler clients map[string]Client + + // isStarted indicates whether the service has started successfully. + isStarted bool } // NewServer returns an initialized device plugin registration server. @@ -109,7 +115,9 @@ func (s *server) Start() error { api.RegisterRegistrationServer(s.grpc, s) go func() { defer s.wg.Done() + s.setHealthy() if err = s.grpc.Serve(ln); err != nil { + s.setUnhealthy() klog.ErrorS(err, "Error while serving device plugin registration grpc server") } }() @@ -134,6 +142,9 @@ func (s *server) Stop() error { s.grpc.Stop() s.wg.Wait() s.grpc = nil + // During kubelet termination, we do not need the registration server, + // and we consider the kubelet to be healthy even when it is down. + s.setHealthy() return nil } @@ -190,3 +201,24 @@ func (s *server) visitClients(visit func(r string, c Client)) { } s.mutex.Unlock() } + +func (s *server) Name() string { + return "device-plugin" +} + +func (s *server) Check(_ *http.Request) error { + if s.isStarted { + return nil + } + return fmt.Errorf("device plugin registration gRPC server failed and no device plugins can register") +} + +// setHealthy sets the health status of the gRPC server. +func (s *server) setHealthy() { + s.isStarted = true +} + +// setUnhealthy sets the health status of the gRPC server to unhealthy. +func (s *server) setUnhealthy() { + s.isStarted = false +} diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 097dea4ca2d..bdf0b27d673 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -21,6 +21,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -62,6 +63,7 @@ type Manager interface { // GetWatcherHandler returns the plugin handler for the device manager. GetWatcherHandler() cache.PluginHandler + GetHealthChecker() healthz.HealthChecker // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) ResourceDeviceInstances diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index 6c575f6b7ff..63f974c10ce 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/server/healthz" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" @@ -131,6 +132,13 @@ func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache return nil } +func (cm *FakeContainerManager) GetHealthCheckers() []healthz.HealthChecker { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationServerChecker") + return []healthz.HealthChecker{} +} + func (cm *FakeContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { cm.Lock() defer cm.Unlock() diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8da0e4fb691..975c0770655 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -34,6 +34,7 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" + inuserns "github.com/moby/sys/userns" "github.com/opencontainers/selinux/go-selinux" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -46,7 +47,6 @@ import ( utilfs "k8s.io/kubernetes/pkg/util/filesystem" netutils "k8s.io/utils/net" - inuserns "github.com/moby/sys/userns" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -961,7 +961,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) { // NewHealthChecker returns an error indicating that the watchdog is configured but the configuration is incorrect, // the kubelet will not be started. - klet.healthChecker, err = watchdog.NewHealthChecker(klet) + checkers := klet.containerManager.GetHealthCheckers() + klet.healthChecker, err = watchdog.NewHealthChecker(klet, watchdog.WithExtendedCheckers(checkers)) if err != nil { return nil, fmt.Errorf("create health checker: %w", err) } @@ -2919,12 +2920,12 @@ func (kl *Kubelet) BirthCry() { // ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider) { - server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp) + server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kl.containerManager.GetHealthCheckers(), kubeCfg, tlsOptions, auth, tp) } // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider) { - server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, tp) + server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, kl.containerManager.GetHealthCheckers(), address, port, tp) } // ListenAndServePodResources runs the kubelet podresources grpc service diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index f983e1d362b..94b282f7a01 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -114,6 +114,7 @@ type Server struct { metricsBuckets sets.Set[string] metricsMethodBuckets sets.Set[string] resourceAnalyzer stats.ResourceAnalyzer + extendedCheckers []healthz.HealthChecker } // TLSOptions holds the TLS options. @@ -156,6 +157,7 @@ func (a *filteringContainer) RegisteredHandlePaths() []string { func ListenAndServeKubeletServer( host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, + checkers []healthz.HealthChecker, kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *TLSOptions, auth AuthInterface, @@ -164,7 +166,7 @@ func ListenAndServeKubeletServer( address := netutils.ParseIPSloppy(kubeCfg.Address) port := uint(kubeCfg.Port) klog.InfoS("Starting to listen", "address", address, "port", port) - handler := NewServer(host, resourceAnalyzer, auth, kubeCfg) + handler := NewServer(host, resourceAnalyzer, checkers, auth, kubeCfg) if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) { handler.InstallTracingFilter(tp) @@ -198,11 +200,12 @@ func ListenAndServeKubeletServer( func ListenAndServeKubeletReadOnlyServer( host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, + checkers []healthz.HealthChecker, address net.IP, port uint, tp oteltrace.TracerProvider) { klog.InfoS("Starting to listen read-only", "address", address, "port", port) - s := NewServer(host, resourceAnalyzer, nil, nil) + s := NewServer(host, resourceAnalyzer, checkers, nil, nil) if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) { s.InstallTracingFilter(tp, otelrestful.WithPublicEndpoint()) @@ -278,6 +281,7 @@ type HostInterface interface { func NewServer( host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, + checkers []healthz.HealthChecker, auth AuthInterface, kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server { @@ -288,6 +292,7 @@ func NewServer( restfulCont: &filteringContainer{Container: restful.NewContainer()}, metricsBuckets: sets.New[string](), metricsMethodBuckets: sets.New[string]("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"), + extendedCheckers: checkers, } if auth != nil { server.InstallAuthFilter() @@ -392,11 +397,13 @@ func (s *Server) getMetricMethodBucket(method string) string { // patterns with the restful Container. func (s *Server) InstallDefaultHandlers() { s.addMetricsBucketMatcher("healthz") - healthz.InstallHandler(s.restfulCont, + checkers := []healthz.HealthChecker{ healthz.PingHealthz, healthz.LogHealthz, healthz.NamedCheck("syncloop", s.host.SyncLoopHealthCheck), - ) + } + checkers = append(checkers, s.extendedCheckers...) + healthz.InstallHandler(s.restfulCont, checkers...) slis.SLIMetricsWithReset{}.Install(s.restfulCont) diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 547ceb8084f..be1d9e922d4 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -53,6 +53,7 @@ import ( "k8s.io/utils/ptr" // Do some initialization to decode the query parameters correctly. + "k8s.io/apiserver/pkg/server/healthz" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubelet/pkg/cri/streaming" @@ -368,6 +369,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo server := NewServer( fw.fakeKubelet, stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), + []healthz.HealthChecker{}, fw.fakeAuth, kubeCfg, ) @@ -1650,8 +1652,8 @@ func TestNewServerRegistersMetricsSLIsEndpointTwice(t *testing.T) { } resourceAnalyzer := stats.NewResourceAnalyzer(nil, time.Minute, &record.FakeRecorder{}) - server1 := NewServer(host, resourceAnalyzer, nil, nil) - server2 := NewServer(host, resourceAnalyzer, nil, nil) + server1 := NewServer(host, resourceAnalyzer, []healthz.HealthChecker{}, nil, nil) + server2 := NewServer(host, resourceAnalyzer, []healthz.HealthChecker{}, nil, nil) // Check if both servers registered the /metrics/slis endpoint assert.Contains(t, server1.restfulCont.RegisteredHandlePaths(), "/metrics/slis", "First server should register /metrics/slis") diff --git a/pkg/kubelet/watchdog/watchdog_linux.go b/pkg/kubelet/watchdog/watchdog_linux.go index 6a8342658c2..e30275aaacd 100644 --- a/pkg/kubelet/watchdog/watchdog_linux.go +++ b/pkg/kubelet/watchdog/watchdog_linux.go @@ -59,6 +59,12 @@ func WithWatchdogClient(watchdog WatchdogClient) Option { } } +func WithExtendedCheckers(checkers []healthz.HealthChecker) Option { + return func(hc *healthChecker) { + hc.checkers = append(hc.checkers, checkers...) + } +} + type healthChecker struct { checkers []healthz.HealthChecker retryBackoff wait.Backoff @@ -109,7 +115,7 @@ func NewHealthChecker(syncLoop syncLoopHealthChecker, opts ...Option) (HealthChe Jitter: 0.1, Steps: 2, } - hc.checkers = checkers + hc.checkers = append(hc.checkers, checkers...) hc.retryBackoff = retryBackoff hc.interval = watchdogVal / 2 diff --git a/pkg/kubelet/watchdog/watchdog_unsupported.go b/pkg/kubelet/watchdog/watchdog_unsupported.go index dae45894a16..b536af21f2d 100644 --- a/pkg/kubelet/watchdog/watchdog_unsupported.go +++ b/pkg/kubelet/watchdog/watchdog_unsupported.go @@ -19,12 +19,20 @@ limitations under the License. package watchdog +import "k8s.io/apiserver/pkg/server/healthz" + type healthCheckerUnsupported struct{} var _ HealthChecker = &healthCheckerUnsupported{} +type Option func(*healthCheckerUnsupported) + +func WithExtendedCheckers(checkers []healthz.HealthChecker) Option { + return nil +} + // NewHealthChecker creates a fake one here -func NewHealthChecker(_ syncLoopHealthChecker) (HealthChecker, error) { +func NewHealthChecker(_ syncLoopHealthChecker, _ ...Option) (HealthChecker, error) { return &healthCheckerUnsupported{}, nil }