integrate kubelet with the systemd watchdog

feat:  add unit test

feat:  add FeatureGate for SystemdWatchdog

fix:  linter and failed tests

feat:  add SystemdWatchdog to versioned feature list yaml
This commit is contained in:
zhifei92 2024-09-23 21:49:54 +08:00
parent e39571591d
commit dac7332ed2
11 changed files with 466 additions and 26 deletions

View File

@ -1658,6 +1658,7 @@ After=network-online.target
[Service]
Restart=always
RestartSec=10
WatchdogSec=30s
EnvironmentFile=${kubelet_env_file}
ExecStart=${kubelet_bin} \$KUBELET_OPTS

View File

@ -749,6 +749,16 @@ const (
//
// Enables the image volume source.
ImageVolume featuregate.Feature = "ImageVolume"
// owner: @zhifei92
// beta: v1.32
//
// Enables the systemd watchdog for the kubelet. When enabled, the kubelet will
// periodically notify the systemd watchdog to indicate that it is still alive.
// This can help prevent the system from restarting the kubelet if it becomes
// unresponsive. The feature gate is enabled by default, but should only be used
// if the system supports the systemd watchdog feature and has it configured properly.
SystemdWatchdog = featuregate.Feature("SystemdWatchdog")
)
func init() {

View File

@ -696,6 +696,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
},
SystemdWatchdog: {
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},
},
TopologyAwareHints: {
{Version: version.MustParse("1.21"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.23"), Default: false, PreRelease: featuregate.Beta},

View File

@ -118,6 +118,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/kubelet/watchdog"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/util/oom"
@ -957,6 +958,14 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// since this relies on the rest of the Kubelet having been constructed.
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) {
// NewHealthChecker returns an error indicating that the watchdog is configured but the configuration is incorrect,
// the kubelet will not be started.
klet.healthChecker, err = watchdog.NewHealthChecker(klet)
if err != nil {
return nil, fmt.Errorf("create health checker: %w", err)
}
}
return klet, nil
}
@ -1344,6 +1353,9 @@ type Kubelet struct {
// Track node startup latencies
nodeStartupLatencyTracker util.NodeStartupLatencyTracker
// Health check kubelet
healthChecker watchdog.HealthChecker
}
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
@ -1698,6 +1710,10 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.eventedPleg.Start()
}
if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) {
kl.healthChecker.Start()
}
kl.syncLoop(ctx, updates, kl)
}
@ -2876,6 +2892,20 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
return val.(time.Time)
}
// SyncLoopHealthCheck checks if kubelet's sync loop that updates containers is working.
func (kl *Kubelet) SyncLoopHealthCheck(req *http.Request) error {
duration := kl.resyncInterval * 2
minDuration := time.Minute * 5
if duration < minDuration {
duration = minDuration
}
enterLoopTime := kl.LatestLoopEntryTime()
if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
return fmt.Errorf("sync Loop took longer than expected")
}
return nil
}
// updateRuntimeUp calls the container runtime status callback, initializing
// the runtime dependent modules when the container runtime first comes up,
// and returns an error if the status check fails. If the status check is OK,
@ -2935,11 +2965,6 @@ func (kl *Kubelet) BirthCry() {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
// ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
}
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
auth server.AuthInterface, tp trace.TracerProvider) {

View File

@ -265,9 +265,8 @@ type HostInterface interface {
CheckpointContainer(ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
ResyncInterval() time.Duration
GetHostname() string
LatestLoopEntryTime() time.Time
SyncLoopHealthCheck(req *http.Request) error
GetExec(ctx context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
GetAttach(ctx context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
@ -396,7 +395,7 @@ func (s *Server) InstallDefaultHandlers() {
healthz.InstallHandler(s.restfulCont,
healthz.PingHealthz,
healthz.LogHealthz,
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
healthz.NamedCheck("syncloop", s.host.SyncLoopHealthCheck),
)
slis.SLIMetricsWithReset{}.Install(s.restfulCont)
@ -678,20 +677,6 @@ func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableC
}
}
// Checks if kubelet's sync loop that updates containers is working.
func (s *Server) syncLoopHealthCheck(req *http.Request) error {
duration := s.host.ResyncInterval() * 2
minDuration := time.Minute * 5
if duration < minDuration {
duration = minDuration
}
enterLoopTime := s.host.LatestLoopEntryTime()
if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
return fmt.Errorf("sync Loop took longer than expected")
}
return nil
}
// getContainerLogs handles containerLogs request against the Kubelet
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
podNamespace := request.PathParameter("podNamespace")

View File

@ -91,10 +91,6 @@ type fakeKubelet struct {
streamingRuntime streaming.Server
}
func (fk *fakeKubelet) ResyncInterval() time.Duration {
return fk.resyncInterval
}
func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
return fk.loopEntryTime
}
@ -154,6 +150,19 @@ func (fk *fakeKubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi
return nil, nil
}
func (fk *fakeKubelet) SyncLoopHealthCheck(req *http.Request) error {
duration := fk.resyncInterval * 2
minDuration := time.Minute * 5
if duration < minDuration {
duration = minDuration
}
enterLoopTime := fk.LatestLoopEntryTime()
if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
return fmt.Errorf("sync Loop took longer than expected")
}
return nil
}
type fakeRuntime struct {
execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error

View File

@ -0,0 +1,29 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watchdog
import "net/http"
// HealthChecker defines the interface of health checkers.
type HealthChecker interface {
Start()
}
// syncLoopHealthChecker contains the health check method for syncLoop.
type syncLoopHealthChecker interface {
SyncLoopHealthCheck(req *http.Request) error
}

View File

@ -0,0 +1,158 @@
//go:build linux
// +build linux
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watchdog
import (
"fmt"
"time"
"github.com/coreos/go-systemd/v22/daemon"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/klog/v2"
)
// WatchdogClient defines the interface for interacting with the systemd watchdog.
type WatchdogClient interface {
SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error)
SdNotify(unsetEnvironment bool) (bool, error)
}
// DefaultWatchdogClient implements the WatchdogClient interface using the actual systemd daemon functions.
type DefaultWatchdogClient struct{}
var _ WatchdogClient = &DefaultWatchdogClient{}
func (d *DefaultWatchdogClient) SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) {
return daemon.SdWatchdogEnabled(unsetEnvironment)
}
func (d *DefaultWatchdogClient) SdNotify(unsetEnvironment bool) (bool, error) {
return daemon.SdNotify(unsetEnvironment, daemon.SdNotifyWatchdog)
}
// Option defines optional parameters for initializing the healthChecker
// structure.
type Option func(*healthChecker)
func WithWatchdogClient(watchdog WatchdogClient) Option {
return func(hc *healthChecker) {
hc.watchdog = watchdog
}
}
type healthChecker struct {
checkers []healthz.HealthChecker
retryBackoff wait.Backoff
interval time.Duration
watchdog WatchdogClient
}
var _ HealthChecker = &healthChecker{}
const minimalNotifyInterval = time.Second
// NewHealthChecker creates a new HealthChecker instance.
// This function initializes the health checker and configures its behavior based on the status of the systemd watchdog.
// If the watchdog is not enabled, the function returns an error.
func NewHealthChecker(syncLoop syncLoopHealthChecker, opts ...Option) (HealthChecker, error) {
hc := &healthChecker{
watchdog: &DefaultWatchdogClient{},
}
for _, o := range opts {
o(hc)
}
// get watchdog information
watchdogVal, err := hc.watchdog.SdWatchdogEnabled(false)
if err != nil {
// Failed to get watchdog configuration information.
// This occurs when we want to start the watchdog but the configuration is incorrect,
// for example, the time is not configured correctly.
return nil, fmt.Errorf("configure watchdog: %w", err)
}
if watchdogVal == 0 {
klog.InfoS("Systemd watchdog is not enabled")
return &healthChecker{}, nil
}
if watchdogVal <= minimalNotifyInterval {
return nil, fmt.Errorf("configure watchdog timeout too small: %v", watchdogVal)
}
// The health checks performed by checkers are the same as those for "/healthz".
checkers := []healthz.HealthChecker{
healthz.PingHealthz,
healthz.LogHealthz,
healthz.NamedCheck("syncloop", syncLoop.SyncLoopHealthCheck),
}
retryBackoff := wait.Backoff{
Duration: time.Second,
Factor: 2.0,
Jitter: 0.1,
Steps: 2,
}
hc.checkers = checkers
hc.retryBackoff = retryBackoff
hc.interval = watchdogVal / 2
return hc, nil
}
func (hc *healthChecker) Start() {
if hc.interval <= 0 {
klog.InfoS("Systemd watchdog is not enabled or the interval is invalid, so health checking will not be started.")
return
}
klog.InfoS("Starting systemd watchdog with interval", "interval", hc.interval)
go wait.Forever(func() {
if err := hc.doCheck(); err != nil {
klog.ErrorS(err, "Do not notify watchdog this iteration as the kubelet is reportedly not healthy")
return
}
err := wait.ExponentialBackoff(hc.retryBackoff, func() (bool, error) {
ack, err := hc.watchdog.SdNotify(false)
if err != nil {
klog.V(5).InfoS("Failed to notify systemd watchdog, retrying", "error", err)
return false, nil
}
if !ack {
return false, fmt.Errorf("failed to notify systemd watchdog, notification not supported - (i.e. NOTIFY_SOCKET is unset)")
}
klog.V(5).InfoS("Watchdog plugin notified", "acknowledgment", ack, "state", daemon.SdNotifyWatchdog)
return true, nil
})
if err != nil {
klog.ErrorS(err, "Failed to notify watchdog")
}
}, hc.interval)
}
func (hc *healthChecker) doCheck() error {
for _, hc := range hc.checkers {
if err := hc.Check(nil); err != nil {
return fmt.Errorf("checker %s failed: %w", hc.Name(), err)
}
}
return nil
}

View File

@ -0,0 +1,180 @@
//go:build linux
// +build linux
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watchdog
import (
"bytes"
"errors"
"flag"
"net/http"
"strings"
"testing"
"time"
"k8s.io/klog/v2"
)
// Mock syncLoopHealthChecker
type mockSyncLoopHealthChecker struct {
healthCheckErr error
}
func (m *mockSyncLoopHealthChecker) SyncLoopHealthCheck(req *http.Request) error {
return m.healthCheckErr
}
// Mock WatchdogClient
type mockWatchdogClient struct {
enabledVal time.Duration
enabledErr error
notifyAck bool
notifyErr error
}
func (m *mockWatchdogClient) SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) {
return m.enabledVal, m.enabledErr
}
func (m *mockWatchdogClient) SdNotify(unsetEnvironment bool) (bool, error) {
return m.notifyAck, m.notifyErr
}
const (
interval = 4 * time.Second
intervalSmall = 1 * time.Second
)
// TestNewHealthChecker tests the NewHealthChecker function.
func TestNewHealthChecker(t *testing.T) {
// Test cases
tests := []struct {
name string
mockEnabled time.Duration
mockErr error
wantErr bool
}{
{"Watchdog enabled", interval, nil, false},
{"Watchdog not enabled", 0, nil, false},
{"Watchdog enabled with error", interval, errors.New("mock error"), true},
{"Watchdog timeout too small", intervalSmall, nil, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockClient := &mockWatchdogClient{
enabledVal: tt.mockEnabled,
enabledErr: tt.mockErr,
}
_, err := NewHealthChecker(&mockSyncLoopHealthChecker{}, WithWatchdogClient(mockClient))
if (err != nil) != tt.wantErr {
t.Errorf("NewHealthChecker() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestHealthCheckerStart tests the Start method of the healthChecker.
func TestHealthCheckerStart(t *testing.T) {
// Test cases
tests := []struct {
name string
enabledVal time.Duration
healthCheckErr error
notifyAck bool
notifyErr error
expectedLogs []string
}{
{
name: "Watchdog enabled and notify succeeds",
enabledVal: interval,
healthCheckErr: nil,
notifyAck: true,
notifyErr: nil,
expectedLogs: []string{"Starting systemd watchdog with interval", "Watchdog plugin notified"},
},
{
name: "Watchdog enabled and notify fails, notification not supported",
enabledVal: interval,
healthCheckErr: nil,
notifyAck: false,
notifyErr: nil,
expectedLogs: []string{"Starting systemd watchdog with interval", "Failed to notify watchdog", "notification not supported"},
},
{
name: "Watchdog enabled and notify fails, transmission failed",
enabledVal: interval,
healthCheckErr: nil,
notifyAck: false,
notifyErr: errors.New("mock notify error"),
expectedLogs: []string{"Starting systemd watchdog with interval", "Failed to notify watchdog"},
},
{
name: "Watchdog enabled and health check fails",
enabledVal: interval,
healthCheckErr: errors.New("mock healthy error"),
notifyAck: true,
notifyErr: nil,
expectedLogs: []string{"Starting systemd watchdog with interval", "Do not notify watchdog this iteration as the kubelet is reportedly not healthy"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Capture logs
var logBuffer bytes.Buffer
flags := &flag.FlagSet{}
klog.InitFlags(flags)
if err := flags.Set("v", "5"); err != nil {
t.Fatal(err)
}
klog.LogToStderr(false)
klog.SetOutput(&logBuffer)
// Mock SdWatchdogEnabled to return a valid value
mockClient := &mockWatchdogClient{
enabledVal: tt.enabledVal,
notifyAck: tt.notifyAck,
notifyErr: tt.notifyErr,
}
// Create a healthChecker
hc, err := NewHealthChecker(&mockSyncLoopHealthChecker{healthCheckErr: tt.healthCheckErr}, WithWatchdogClient(mockClient))
if err != nil {
t.Fatalf("NewHealthChecker() failed: %v", err)
}
// Start the health checker
hc.Start()
// Wait for a short period to allow the health check to run
time.Sleep(2 * interval)
// Check logs to verify the health check ran
klog.Flush()
logs := logBuffer.String()
for _, expectedLog := range tt.expectedLogs {
if !strings.Contains(logs, expectedLog) {
t.Errorf("Expected log '%s' not found in logs: %s", expectedLog, logs)
}
}
})
}
}

View File

@ -0,0 +1,33 @@
//go:build !linux
// +build !linux
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watchdog
type healthCheckerUnsupported struct{}
var _ HealthChecker = &healthCheckerUnsupported{}
// NewHealthChecker creates a fake one here
func NewHealthChecker(_ syncLoopHealthChecker) (HealthChecker, error) {
return &healthCheckerUnsupported{}, nil
}
func (ow *healthCheckerUnsupported) Start() {
return
}

View File

@ -1218,6 +1218,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.31"
- name: SystemdWatchdog
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.32"
- name: TopologyAwareHints
versionedSpecs:
- default: false