diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index 2fe81511997..0784e35dd34 100755 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -1658,6 +1658,7 @@ After=network-online.target [Service] Restart=always RestartSec=10 +WatchdogSec=30s EnvironmentFile=${kubelet_env_file} ExecStart=${kubelet_bin} \$KUBELET_OPTS diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 10ad47e4b1e..c8eaf462dbc 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -749,6 +749,16 @@ const ( // // Enables the image volume source. ImageVolume featuregate.Feature = "ImageVolume" + + // owner: @zhifei92 + // beta: v1.32 + // + // Enables the systemd watchdog for the kubelet. When enabled, the kubelet will + // periodically notify the systemd watchdog to indicate that it is still alive. + // This can help prevent the system from restarting the kubelet if it becomes + // unresponsive. The feature gate is enabled by default, but should only be used + // if the system supports the systemd watchdog feature and has it configured properly. + SystemdWatchdog = featuregate.Feature("SystemdWatchdog") ) func init() { diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 6029bfe7a6a..b4862057785 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -696,6 +696,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, }, + SystemdWatchdog: { + {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta}, + }, + TopologyAwareHints: { {Version: version.MustParse("1.21"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.23"), Default: false, PreRelease: featuregate.Beta}, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 01a723987f8..ad4a84ddd44 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -118,6 +118,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/volumemanager" + "k8s.io/kubernetes/pkg/kubelet/watchdog" httpprobe "k8s.io/kubernetes/pkg/probe/http" "k8s.io/kubernetes/pkg/security/apparmor" "k8s.io/kubernetes/pkg/util/oom" @@ -957,6 +958,14 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, // since this relies on the rest of the Kubelet having been constructed. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() + if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) { + // NewHealthChecker returns an error indicating that the watchdog is configured but the configuration is incorrect, + // the kubelet will not be started. + klet.healthChecker, err = watchdog.NewHealthChecker(klet) + if err != nil { + return nil, fmt.Errorf("create health checker: %w", err) + } + } return klet, nil } @@ -1344,6 +1353,9 @@ type Kubelet struct { // Track node startup latencies nodeStartupLatencyTracker util.NodeStartupLatencyTracker + + // Health check kubelet + healthChecker watchdog.HealthChecker } // ListPodStats is delegated to StatsProvider, which implements stats.Provider interface @@ -1698,6 +1710,10 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { kl.eventedPleg.Start() } + if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) { + kl.healthChecker.Start() + } + kl.syncLoop(ctx, updates, kl) } @@ -2876,6 +2892,20 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { return val.(time.Time) } +// SyncLoopHealthCheck checks if kubelet's sync loop that updates containers is working. +func (kl *Kubelet) SyncLoopHealthCheck(req *http.Request) error { + duration := kl.resyncInterval * 2 + minDuration := time.Minute * 5 + if duration < minDuration { + duration = minDuration + } + enterLoopTime := kl.LatestLoopEntryTime() + if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) { + return fmt.Errorf("sync Loop took longer than expected") + } + return nil +} + // updateRuntimeUp calls the container runtime status callback, initializing // the runtime dependent modules when the container runtime first comes up, // and returns an error if the status check fails. If the status check is OK, @@ -2935,11 +2965,6 @@ func (kl *Kubelet) BirthCry() { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.") } -// ResyncInterval returns the interval used for periodic syncs. -func (kl *Kubelet) ResyncInterval() time.Duration { - return kl.resyncInterval -} - // ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider) { diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index e81aa8c454e..f983e1d362b 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -265,9 +265,8 @@ type HostInterface interface { CheckpointContainer(ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) - ResyncInterval() time.Duration GetHostname() string - LatestLoopEntryTime() time.Time + SyncLoopHealthCheck(req *http.Request) error GetExec(ctx context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) GetAttach(ctx context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) @@ -396,7 +395,7 @@ func (s *Server) InstallDefaultHandlers() { healthz.InstallHandler(s.restfulCont, healthz.PingHealthz, healthz.LogHealthz, - healthz.NamedCheck("syncloop", s.syncLoopHealthCheck), + healthz.NamedCheck("syncloop", s.host.SyncLoopHealthCheck), ) slis.SLIMetricsWithReset{}.Install(s.restfulCont) @@ -678,20 +677,6 @@ func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableC } } -// Checks if kubelet's sync loop that updates containers is working. -func (s *Server) syncLoopHealthCheck(req *http.Request) error { - duration := s.host.ResyncInterval() * 2 - minDuration := time.Minute * 5 - if duration < minDuration { - duration = minDuration - } - enterLoopTime := s.host.LatestLoopEntryTime() - if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) { - return fmt.Errorf("sync Loop took longer than expected") - } - return nil -} - // getContainerLogs handles containerLogs request against the Kubelet func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) { podNamespace := request.PathParameter("podNamespace") diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index d882ef1251e..26123e18b8c 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -91,10 +91,6 @@ type fakeKubelet struct { streamingRuntime streaming.Server } -func (fk *fakeKubelet) ResyncInterval() time.Duration { - return fk.resyncInterval -} - func (fk *fakeKubelet) LatestLoopEntryTime() time.Time { return fk.loopEntryTime } @@ -154,6 +150,19 @@ func (fk *fakeKubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi return nil, nil } +func (fk *fakeKubelet) SyncLoopHealthCheck(req *http.Request) error { + duration := fk.resyncInterval * 2 + minDuration := time.Minute * 5 + if duration < minDuration { + duration = minDuration + } + enterLoopTime := fk.LatestLoopEntryTime() + if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) { + return fmt.Errorf("sync Loop took longer than expected") + } + return nil +} + type fakeRuntime struct { execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error diff --git a/pkg/kubelet/watchdog/types.go b/pkg/kubelet/watchdog/types.go new file mode 100644 index 00000000000..8f6d3838fde --- /dev/null +++ b/pkg/kubelet/watchdog/types.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watchdog + +import "net/http" + +// HealthChecker defines the interface of health checkers. +type HealthChecker interface { + Start() +} + +// syncLoopHealthChecker contains the health check method for syncLoop. +type syncLoopHealthChecker interface { + SyncLoopHealthCheck(req *http.Request) error +} diff --git a/pkg/kubelet/watchdog/watchdog_linux.go b/pkg/kubelet/watchdog/watchdog_linux.go new file mode 100644 index 00000000000..6a8342658c2 --- /dev/null +++ b/pkg/kubelet/watchdog/watchdog_linux.go @@ -0,0 +1,158 @@ +//go:build linux +// +build linux + +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watchdog + +import ( + "fmt" + "time" + + "github.com/coreos/go-systemd/v22/daemon" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/klog/v2" +) + +// WatchdogClient defines the interface for interacting with the systemd watchdog. +type WatchdogClient interface { + SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) + SdNotify(unsetEnvironment bool) (bool, error) +} + +// DefaultWatchdogClient implements the WatchdogClient interface using the actual systemd daemon functions. +type DefaultWatchdogClient struct{} + +var _ WatchdogClient = &DefaultWatchdogClient{} + +func (d *DefaultWatchdogClient) SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) { + return daemon.SdWatchdogEnabled(unsetEnvironment) +} + +func (d *DefaultWatchdogClient) SdNotify(unsetEnvironment bool) (bool, error) { + return daemon.SdNotify(unsetEnvironment, daemon.SdNotifyWatchdog) +} + +// Option defines optional parameters for initializing the healthChecker +// structure. +type Option func(*healthChecker) + +func WithWatchdogClient(watchdog WatchdogClient) Option { + return func(hc *healthChecker) { + hc.watchdog = watchdog + } +} + +type healthChecker struct { + checkers []healthz.HealthChecker + retryBackoff wait.Backoff + interval time.Duration + watchdog WatchdogClient +} + +var _ HealthChecker = &healthChecker{} + +const minimalNotifyInterval = time.Second + +// NewHealthChecker creates a new HealthChecker instance. +// This function initializes the health checker and configures its behavior based on the status of the systemd watchdog. +// If the watchdog is not enabled, the function returns an error. +func NewHealthChecker(syncLoop syncLoopHealthChecker, opts ...Option) (HealthChecker, error) { + hc := &healthChecker{ + watchdog: &DefaultWatchdogClient{}, + } + for _, o := range opts { + o(hc) + } + + // get watchdog information + watchdogVal, err := hc.watchdog.SdWatchdogEnabled(false) + if err != nil { + // Failed to get watchdog configuration information. + // This occurs when we want to start the watchdog but the configuration is incorrect, + // for example, the time is not configured correctly. + return nil, fmt.Errorf("configure watchdog: %w", err) + } + if watchdogVal == 0 { + klog.InfoS("Systemd watchdog is not enabled") + return &healthChecker{}, nil + } + if watchdogVal <= minimalNotifyInterval { + return nil, fmt.Errorf("configure watchdog timeout too small: %v", watchdogVal) + } + + // The health checks performed by checkers are the same as those for "/healthz". + checkers := []healthz.HealthChecker{ + healthz.PingHealthz, + healthz.LogHealthz, + healthz.NamedCheck("syncloop", syncLoop.SyncLoopHealthCheck), + } + retryBackoff := wait.Backoff{ + Duration: time.Second, + Factor: 2.0, + Jitter: 0.1, + Steps: 2, + } + hc.checkers = checkers + hc.retryBackoff = retryBackoff + hc.interval = watchdogVal / 2 + + return hc, nil +} + +func (hc *healthChecker) Start() { + if hc.interval <= 0 { + klog.InfoS("Systemd watchdog is not enabled or the interval is invalid, so health checking will not be started.") + return + } + klog.InfoS("Starting systemd watchdog with interval", "interval", hc.interval) + + go wait.Forever(func() { + if err := hc.doCheck(); err != nil { + klog.ErrorS(err, "Do not notify watchdog this iteration as the kubelet is reportedly not healthy") + return + } + + err := wait.ExponentialBackoff(hc.retryBackoff, func() (bool, error) { + ack, err := hc.watchdog.SdNotify(false) + if err != nil { + klog.V(5).InfoS("Failed to notify systemd watchdog, retrying", "error", err) + return false, nil + } + if !ack { + return false, fmt.Errorf("failed to notify systemd watchdog, notification not supported - (i.e. NOTIFY_SOCKET is unset)") + } + + klog.V(5).InfoS("Watchdog plugin notified", "acknowledgment", ack, "state", daemon.SdNotifyWatchdog) + return true, nil + }) + if err != nil { + klog.ErrorS(err, "Failed to notify watchdog") + } + }, hc.interval) +} + +func (hc *healthChecker) doCheck() error { + for _, hc := range hc.checkers { + if err := hc.Check(nil); err != nil { + return fmt.Errorf("checker %s failed: %w", hc.Name(), err) + } + } + return nil +} diff --git a/pkg/kubelet/watchdog/watchdog_linux_test.go b/pkg/kubelet/watchdog/watchdog_linux_test.go new file mode 100644 index 00000000000..86cdd744cd4 --- /dev/null +++ b/pkg/kubelet/watchdog/watchdog_linux_test.go @@ -0,0 +1,180 @@ +//go:build linux +// +build linux + +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watchdog + +import ( + "bytes" + "errors" + "flag" + "net/http" + "strings" + "testing" + "time" + + "k8s.io/klog/v2" +) + +// Mock syncLoopHealthChecker +type mockSyncLoopHealthChecker struct { + healthCheckErr error +} + +func (m *mockSyncLoopHealthChecker) SyncLoopHealthCheck(req *http.Request) error { + return m.healthCheckErr +} + +// Mock WatchdogClient +type mockWatchdogClient struct { + enabledVal time.Duration + enabledErr error + notifyAck bool + notifyErr error +} + +func (m *mockWatchdogClient) SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) { + return m.enabledVal, m.enabledErr +} + +func (m *mockWatchdogClient) SdNotify(unsetEnvironment bool) (bool, error) { + return m.notifyAck, m.notifyErr +} + +const ( + interval = 4 * time.Second + intervalSmall = 1 * time.Second +) + +// TestNewHealthChecker tests the NewHealthChecker function. +func TestNewHealthChecker(t *testing.T) { + // Test cases + tests := []struct { + name string + mockEnabled time.Duration + mockErr error + wantErr bool + }{ + {"Watchdog enabled", interval, nil, false}, + {"Watchdog not enabled", 0, nil, false}, + {"Watchdog enabled with error", interval, errors.New("mock error"), true}, + {"Watchdog timeout too small", intervalSmall, nil, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient := &mockWatchdogClient{ + enabledVal: tt.mockEnabled, + enabledErr: tt.mockErr, + } + + _, err := NewHealthChecker(&mockSyncLoopHealthChecker{}, WithWatchdogClient(mockClient)) + if (err != nil) != tt.wantErr { + t.Errorf("NewHealthChecker() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +// TestHealthCheckerStart tests the Start method of the healthChecker. +func TestHealthCheckerStart(t *testing.T) { + // Test cases + tests := []struct { + name string + enabledVal time.Duration + healthCheckErr error + notifyAck bool + notifyErr error + expectedLogs []string + }{ + { + name: "Watchdog enabled and notify succeeds", + enabledVal: interval, + healthCheckErr: nil, + notifyAck: true, + notifyErr: nil, + expectedLogs: []string{"Starting systemd watchdog with interval", "Watchdog plugin notified"}, + }, + { + name: "Watchdog enabled and notify fails, notification not supported", + enabledVal: interval, + healthCheckErr: nil, + notifyAck: false, + notifyErr: nil, + expectedLogs: []string{"Starting systemd watchdog with interval", "Failed to notify watchdog", "notification not supported"}, + }, + { + name: "Watchdog enabled and notify fails, transmission failed", + enabledVal: interval, + healthCheckErr: nil, + notifyAck: false, + notifyErr: errors.New("mock notify error"), + expectedLogs: []string{"Starting systemd watchdog with interval", "Failed to notify watchdog"}, + }, + { + name: "Watchdog enabled and health check fails", + enabledVal: interval, + healthCheckErr: errors.New("mock healthy error"), + notifyAck: true, + notifyErr: nil, + expectedLogs: []string{"Starting systemd watchdog with interval", "Do not notify watchdog this iteration as the kubelet is reportedly not healthy"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Capture logs + var logBuffer bytes.Buffer + flags := &flag.FlagSet{} + klog.InitFlags(flags) + if err := flags.Set("v", "5"); err != nil { + t.Fatal(err) + } + klog.LogToStderr(false) + klog.SetOutput(&logBuffer) + + // Mock SdWatchdogEnabled to return a valid value + mockClient := &mockWatchdogClient{ + enabledVal: tt.enabledVal, + notifyAck: tt.notifyAck, + notifyErr: tt.notifyErr, + } + + // Create a healthChecker + hc, err := NewHealthChecker(&mockSyncLoopHealthChecker{healthCheckErr: tt.healthCheckErr}, WithWatchdogClient(mockClient)) + if err != nil { + t.Fatalf("NewHealthChecker() failed: %v", err) + } + + // Start the health checker + hc.Start() + + // Wait for a short period to allow the health check to run + time.Sleep(2 * interval) + + // Check logs to verify the health check ran + klog.Flush() + logs := logBuffer.String() + for _, expectedLog := range tt.expectedLogs { + if !strings.Contains(logs, expectedLog) { + t.Errorf("Expected log '%s' not found in logs: %s", expectedLog, logs) + } + } + }) + } +} diff --git a/pkg/kubelet/watchdog/watchdog_unsupported.go b/pkg/kubelet/watchdog/watchdog_unsupported.go new file mode 100644 index 00000000000..dae45894a16 --- /dev/null +++ b/pkg/kubelet/watchdog/watchdog_unsupported.go @@ -0,0 +1,33 @@ +//go:build !linux +// +build !linux + +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watchdog + +type healthCheckerUnsupported struct{} + +var _ HealthChecker = &healthCheckerUnsupported{} + +// NewHealthChecker creates a fake one here +func NewHealthChecker(_ syncLoopHealthChecker) (HealthChecker, error) { + return &healthCheckerUnsupported{}, nil +} + +func (ow *healthCheckerUnsupported) Start() { + return +} diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index c21e54f9fd7..ec895663cf9 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -1218,6 +1218,12 @@ lockToDefault: false preRelease: Alpha version: "1.31" +- name: SystemdWatchdog + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.32" - name: TopologyAwareHints versionedSpecs: - default: false