mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
feat: Integrate device plugin registration gRPC server health checks.
This commit is contained in:
parent
e5a31e8bbc
commit
1381e41f28
@ -28,6 +28,7 @@ import (
|
||||
|
||||
// TODO: Migrate kubelet to either use its own internal objects or client library.
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
@ -122,6 +123,10 @@ type ContainerManager interface {
|
||||
// registration.
|
||||
GetPluginRegistrationHandlers() map[string]cache.PluginHandler
|
||||
|
||||
// GetHealthCheckers returns a set of health checkers for all plugins.
|
||||
// These checkers are integrated into the systemd watchdog to monitor the service's health.
|
||||
GetHealthCheckers() []healthz.HealthChecker
|
||||
|
||||
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
|
||||
// due to node recreation.
|
||||
ShouldResetExtendedResourceCapacity() bool
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
@ -661,6 +662,10 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache
|
||||
return res
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetHealthCheckers() []healthz.HealthChecker {
|
||||
return []healthz.HealthChecker{cm.deviceManager.GetHealthChecker()}
|
||||
}
|
||||
|
||||
// TODO: move the GetResources logic to PodContainerManager.
|
||||
func (cm *containerManagerImpl) GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
@ -95,6 +96,10 @@ func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) GetHealthCheckers() []healthz.HealthChecker {
|
||||
return []healthz.HealthChecker{}
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
||||
return cm.extendedPluginResources, cm.extendedPluginResources, []string{}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
@ -224,6 +225,10 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache
|
||||
return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()}
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetHealthCheckers() []healthz.HealthChecker {
|
||||
return []healthz.HealthChecker{cm.deviceManager.GetHealthChecker()}
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
||||
return cm.deviceManager.GetCapacity()
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
errorsutil "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -326,6 +327,11 @@ func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
|
||||
return m.server
|
||||
}
|
||||
|
||||
// GetHealthChecker returns the plugin handler
|
||||
func (m *ManagerImpl) GetHealthChecker() healthz.HealthChecker {
|
||||
return m.server
|
||||
}
|
||||
|
||||
// checkpointFile returns device plugin checkpoint file path.
|
||||
func (m *ManagerImpl) checkpointFile() string {
|
||||
return filepath.Join(m.checkpointdir, kubeletDeviceManagerCheckpoint)
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@ -28,6 +29,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
core "k8s.io/api/core/v1"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/klog/v2"
|
||||
api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
@ -39,6 +41,7 @@ import (
|
||||
// Server interface provides methods for Device plugin registration server.
|
||||
type Server interface {
|
||||
cache.PluginHandler
|
||||
healthz.HealthChecker
|
||||
Start() error
|
||||
Stop() error
|
||||
SocketPath() string
|
||||
@ -53,6 +56,9 @@ type server struct {
|
||||
rhandler RegistrationHandler
|
||||
chandler ClientHandler
|
||||
clients map[string]Client
|
||||
|
||||
// isStarted indicates whether the service has started successfully.
|
||||
isStarted bool
|
||||
}
|
||||
|
||||
// NewServer returns an initialized device plugin registration server.
|
||||
@ -109,7 +115,9 @@ func (s *server) Start() error {
|
||||
api.RegisterRegistrationServer(s.grpc, s)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.setHealthy()
|
||||
if err = s.grpc.Serve(ln); err != nil {
|
||||
s.setUnhealthy()
|
||||
klog.ErrorS(err, "Error while serving device plugin registration grpc server")
|
||||
}
|
||||
}()
|
||||
@ -134,6 +142,9 @@ func (s *server) Stop() error {
|
||||
s.grpc.Stop()
|
||||
s.wg.Wait()
|
||||
s.grpc = nil
|
||||
// During kubelet termination, we do not need the registration server,
|
||||
// and we consider the kubelet to be healthy even when it is down.
|
||||
s.setHealthy()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -190,3 +201,24 @@ func (s *server) visitClients(visit func(r string, c Client)) {
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *server) Name() string {
|
||||
return "device-plugin"
|
||||
}
|
||||
|
||||
func (s *server) Check(_ *http.Request) error {
|
||||
if s.isStarted {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("device plugin registration gRPC server failed and no device plugins can register")
|
||||
}
|
||||
|
||||
// setHealthy sets the health status of the gRPC server.
|
||||
func (s *server) setHealthy() {
|
||||
s.isStarted = true
|
||||
}
|
||||
|
||||
// setUnhealthy sets the health status of the gRPC server to unhealthy.
|
||||
func (s *server) setUnhealthy() {
|
||||
s.isStarted = false
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
@ -62,6 +63,7 @@ type Manager interface {
|
||||
|
||||
// GetWatcherHandler returns the plugin handler for the device manager.
|
||||
GetWatcherHandler() cache.PluginHandler
|
||||
GetHealthChecker() healthz.HealthChecker
|
||||
|
||||
// GetDevices returns information about the devices assigned to pods and containers
|
||||
GetDevices(podUID, containerName string) ResourceDeviceInstances
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
@ -131,6 +132,13 @@ func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *FakeContainerManager) GetHealthCheckers() []healthz.HealthChecker {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationServerChecker")
|
||||
return []healthz.HealthChecker{}
|
||||
}
|
||||
|
||||
func (cm *FakeContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"time"
|
||||
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
inuserns "github.com/moby/sys/userns"
|
||||
"github.com/opencontainers/selinux/go-selinux"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
@ -46,7 +47,6 @@ import (
|
||||
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
|
||||
netutils "k8s.io/utils/net"
|
||||
|
||||
inuserns "github.com/moby/sys/userns"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
@ -961,7 +961,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) {
|
||||
// NewHealthChecker returns an error indicating that the watchdog is configured but the configuration is incorrect,
|
||||
// the kubelet will not be started.
|
||||
klet.healthChecker, err = watchdog.NewHealthChecker(klet)
|
||||
checkers := klet.containerManager.GetHealthCheckers()
|
||||
klet.healthChecker, err = watchdog.NewHealthChecker(klet, watchdog.WithExtendedCheckers(checkers))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create health checker: %w", err)
|
||||
}
|
||||
@ -2919,12 +2920,12 @@ func (kl *Kubelet) BirthCry() {
|
||||
// ListenAndServe runs the kubelet HTTP server.
|
||||
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
|
||||
auth server.AuthInterface, tp trace.TracerProvider) {
|
||||
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp)
|
||||
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kl.containerManager.GetHealthCheckers(), kubeCfg, tlsOptions, auth, tp)
|
||||
}
|
||||
|
||||
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
|
||||
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider) {
|
||||
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, tp)
|
||||
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, kl.containerManager.GetHealthCheckers(), address, port, tp)
|
||||
}
|
||||
|
||||
// ListenAndServePodResources runs the kubelet podresources grpc service
|
||||
|
@ -114,6 +114,7 @@ type Server struct {
|
||||
metricsBuckets sets.Set[string]
|
||||
metricsMethodBuckets sets.Set[string]
|
||||
resourceAnalyzer stats.ResourceAnalyzer
|
||||
extendedCheckers []healthz.HealthChecker
|
||||
}
|
||||
|
||||
// TLSOptions holds the TLS options.
|
||||
@ -156,6 +157,7 @@ func (a *filteringContainer) RegisteredHandlePaths() []string {
|
||||
func ListenAndServeKubeletServer(
|
||||
host HostInterface,
|
||||
resourceAnalyzer stats.ResourceAnalyzer,
|
||||
checkers []healthz.HealthChecker,
|
||||
kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
tlsOptions *TLSOptions,
|
||||
auth AuthInterface,
|
||||
@ -164,7 +166,7 @@ func ListenAndServeKubeletServer(
|
||||
address := netutils.ParseIPSloppy(kubeCfg.Address)
|
||||
port := uint(kubeCfg.Port)
|
||||
klog.InfoS("Starting to listen", "address", address, "port", port)
|
||||
handler := NewServer(host, resourceAnalyzer, auth, kubeCfg)
|
||||
handler := NewServer(host, resourceAnalyzer, checkers, auth, kubeCfg)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
|
||||
handler.InstallTracingFilter(tp)
|
||||
@ -198,11 +200,12 @@ func ListenAndServeKubeletServer(
|
||||
func ListenAndServeKubeletReadOnlyServer(
|
||||
host HostInterface,
|
||||
resourceAnalyzer stats.ResourceAnalyzer,
|
||||
checkers []healthz.HealthChecker,
|
||||
address net.IP,
|
||||
port uint,
|
||||
tp oteltrace.TracerProvider) {
|
||||
klog.InfoS("Starting to listen read-only", "address", address, "port", port)
|
||||
s := NewServer(host, resourceAnalyzer, nil, nil)
|
||||
s := NewServer(host, resourceAnalyzer, checkers, nil, nil)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
|
||||
s.InstallTracingFilter(tp, otelrestful.WithPublicEndpoint())
|
||||
@ -278,6 +281,7 @@ type HostInterface interface {
|
||||
func NewServer(
|
||||
host HostInterface,
|
||||
resourceAnalyzer stats.ResourceAnalyzer,
|
||||
checkers []healthz.HealthChecker,
|
||||
auth AuthInterface,
|
||||
kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
|
||||
|
||||
@ -288,6 +292,7 @@ func NewServer(
|
||||
restfulCont: &filteringContainer{Container: restful.NewContainer()},
|
||||
metricsBuckets: sets.New[string](),
|
||||
metricsMethodBuckets: sets.New[string]("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
|
||||
extendedCheckers: checkers,
|
||||
}
|
||||
if auth != nil {
|
||||
server.InstallAuthFilter()
|
||||
@ -392,11 +397,13 @@ func (s *Server) getMetricMethodBucket(method string) string {
|
||||
// patterns with the restful Container.
|
||||
func (s *Server) InstallDefaultHandlers() {
|
||||
s.addMetricsBucketMatcher("healthz")
|
||||
healthz.InstallHandler(s.restfulCont,
|
||||
checkers := []healthz.HealthChecker{
|
||||
healthz.PingHealthz,
|
||||
healthz.LogHealthz,
|
||||
healthz.NamedCheck("syncloop", s.host.SyncLoopHealthCheck),
|
||||
)
|
||||
}
|
||||
checkers = append(checkers, s.extendedCheckers...)
|
||||
healthz.InstallHandler(s.restfulCont, checkers...)
|
||||
|
||||
slis.SLIMetricsWithReset{}.Install(s.restfulCont)
|
||||
|
||||
|
@ -53,6 +53,7 @@ import (
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
// Do some initialization to decode the query parameters correctly.
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubelet/pkg/cri/streaming"
|
||||
@ -368,6 +369,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
|
||||
server := NewServer(
|
||||
fw.fakeKubelet,
|
||||
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
|
||||
[]healthz.HealthChecker{},
|
||||
fw.fakeAuth,
|
||||
kubeCfg,
|
||||
)
|
||||
@ -1650,8 +1652,8 @@ func TestNewServerRegistersMetricsSLIsEndpointTwice(t *testing.T) {
|
||||
}
|
||||
resourceAnalyzer := stats.NewResourceAnalyzer(nil, time.Minute, &record.FakeRecorder{})
|
||||
|
||||
server1 := NewServer(host, resourceAnalyzer, nil, nil)
|
||||
server2 := NewServer(host, resourceAnalyzer, nil, nil)
|
||||
server1 := NewServer(host, resourceAnalyzer, []healthz.HealthChecker{}, nil, nil)
|
||||
server2 := NewServer(host, resourceAnalyzer, []healthz.HealthChecker{}, nil, nil)
|
||||
|
||||
// Check if both servers registered the /metrics/slis endpoint
|
||||
assert.Contains(t, server1.restfulCont.RegisteredHandlePaths(), "/metrics/slis", "First server should register /metrics/slis")
|
||||
|
@ -59,6 +59,12 @@ func WithWatchdogClient(watchdog WatchdogClient) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithExtendedCheckers(checkers []healthz.HealthChecker) Option {
|
||||
return func(hc *healthChecker) {
|
||||
hc.checkers = append(hc.checkers, checkers...)
|
||||
}
|
||||
}
|
||||
|
||||
type healthChecker struct {
|
||||
checkers []healthz.HealthChecker
|
||||
retryBackoff wait.Backoff
|
||||
@ -109,7 +115,7 @@ func NewHealthChecker(syncLoop syncLoopHealthChecker, opts ...Option) (HealthChe
|
||||
Jitter: 0.1,
|
||||
Steps: 2,
|
||||
}
|
||||
hc.checkers = checkers
|
||||
hc.checkers = append(hc.checkers, checkers...)
|
||||
hc.retryBackoff = retryBackoff
|
||||
hc.interval = watchdogVal / 2
|
||||
|
||||
|
@ -19,12 +19,20 @@ limitations under the License.
|
||||
|
||||
package watchdog
|
||||
|
||||
import "k8s.io/apiserver/pkg/server/healthz"
|
||||
|
||||
type healthCheckerUnsupported struct{}
|
||||
|
||||
var _ HealthChecker = &healthCheckerUnsupported{}
|
||||
|
||||
type Option func(*healthCheckerUnsupported)
|
||||
|
||||
func WithExtendedCheckers(checkers []healthz.HealthChecker) Option {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewHealthChecker creates a fake one here
|
||||
func NewHealthChecker(_ syncLoopHealthChecker) (HealthChecker, error) {
|
||||
func NewHealthChecker(_ syncLoopHealthChecker, _ ...Option) (HealthChecker, error) {
|
||||
return &healthCheckerUnsupported{}, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user