diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index bbd869b7281..7edfd5061a3 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -24,7 +24,6 @@ import ( "sync" "time" - "github.com/godbus/dbus/v5" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/clock" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -40,14 +39,11 @@ const ( nodeShutdownReason = "Shutdown" nodeShutdownMessage = "Node is shutting, evicting pods" nodeShutdownNotAdmitMessage = "Node is in progress of shutting down, not admitting any new pods" + dbusReconnectPeriod = 1 * time.Second ) var systemDbus = func() (dbusInhibiter, error) { - bus, err := dbus.SystemBus() - if err != nil { - return nil, err - } - return &systemd.DBusCon{SystemBus: bus}, nil + return systemd.NewDBusCon() } type dbusInhibiter interface { @@ -109,55 +105,77 @@ func (m *Manager) Start() error { if !m.isFeatureEnabled() { return nil } - - systemBus, err := systemDbus() + stop, err := m.start() if err != nil { return err } + go func() { + for { + if stop != nil { + <-stop + } + + time.Sleep(dbusReconnectPeriod) + klog.V(1).InfoS("Restarting watch for node shutdown events") + stop, err = m.start() + if err != nil { + klog.ErrorS(err, "Unable to watch the node for shutdown events") + } + } + }() + return nil +} + +func (m *Manager) start() (chan struct{}, error) { + systemBus, err := systemDbus() + if err != nil { + return nil, err + } m.dbusCon = systemBus currentInhibitDelay, err := m.dbusCon.CurrentInhibitDelay() if err != nil { - return err + return nil, err } // If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than shutdownGracePeriodRequested, attempt to update the value to shutdownGracePeriodRequested. if m.shutdownGracePeriodRequested > currentInhibitDelay { err := m.dbusCon.OverrideInhibitDelay(m.shutdownGracePeriodRequested) if err != nil { - return fmt.Errorf("unable to override inhibit delay by shutdown manager: %w", err) + return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err) } err = m.dbusCon.ReloadLogindConf() if err != nil { - return err + return nil, err } // Read the current inhibitDelay again, if the override was successful, currentInhibitDelay will be equal to shutdownGracePeriodRequested. updatedInhibitDelay, err := m.dbusCon.CurrentInhibitDelay() if err != nil { - return err + return nil, err } if updatedInhibitDelay != m.shutdownGracePeriodRequested { - return fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", m.shutdownGracePeriodRequested, updatedInhibitDelay) + return nil, fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", m.shutdownGracePeriodRequested, updatedInhibitDelay) } } err = m.aquireInhibitLock() if err != nil { - return err + return nil, err } events, err := m.dbusCon.MonitorShutdown() if err != nil { releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock) if releaseErr != nil { - return fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %w", releaseErr, err) + return nil, fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %v", releaseErr, err) } - return fmt.Errorf("failed to monitor shutdown: %w", err) + return nil, fmt.Errorf("failed to monitor shutdown: %v", err) } + stop := make(chan struct{}) go func() { // Monitor for shutdown events. This follows the logind Inhibit Delay pattern described on https://www.freedesktop.org/wiki/Software/systemd/inhibit/ // 1. When shutdown manager starts, an inhibit lock is taken. @@ -165,7 +183,12 @@ func (m *Manager) Start() error { // 3. When shutdown(false) event is received, this indicates a previous shutdown was cancelled. In this case, acquire the inhibit lock again. for { select { - case isShuttingDown := <-events: + case isShuttingDown, ok := <-events: + if !ok { + klog.ErrorS(err, "Ended to watching the node for shutdown events") + close(stop) + return + } klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown) m.nodeShuttingDownMutex.Lock() @@ -183,7 +206,7 @@ func (m *Manager) Start() error { } } }() - return nil + return stop, nil } func (m *Manager) aquireInhibitLock() error { diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index cc611f1fc54..cd4ea731c1d 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -93,6 +93,10 @@ func makePod(name string, criticalPod bool, terminationGracePeriod *int64) *v1.P } func TestManager(t *testing.T) { + systemDbusTmp := systemDbus + defer func() { + systemDbus = systemDbusTmp + }() normalPodNoGracePeriod := makePod("normal-pod-nil-grace-period", false /* criticalPod */, nil /* terminationGracePeriod */) criticalPodNoGracePeriod := makePod("critical-pod-nil-grace-period", true /* criticalPod */, nil /* terminationGracePeriod */) @@ -305,3 +309,52 @@ func TestFeatureEnabled(t *testing.T) { }) } } + +func TestRestart(t *testing.T) { + systemDbusTmp := systemDbus + defer func() { + systemDbus = systemDbusTmp + }() + + shutdownGracePeriodRequested := 30 * time.Second + shutdownGracePeriodCriticalPods := 10 * time.Second + systemInhibitDelay := 40 * time.Second + overrideSystemInhibitDelay := 40 * time.Second + activePodsFunc := func() []*v1.Pod { + return nil + } + killPodsFunc := func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error { + return nil + } + syncNodeStatus := func() {} + + var shutdownChan chan bool + var connChan = make(chan struct{}, 1) + + systemDbus = func() (dbusInhibiter, error) { + defer func() { + connChan <- struct{}{} + }() + + shutdownChan = make(chan bool) + dbus := &fakeDbus{currentInhibitDelay: systemInhibitDelay, shutdownChan: shutdownChan, overrideSystemInhibitDelay: overrideSystemInhibitDelay} + return dbus, nil + } + + manager, _ := NewManager(activePodsFunc, killPodsFunc, syncNodeStatus, shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods) + err := manager.Start() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + for i := 0; i != 5; i++ { + select { + case <-time.After(dbusReconnectPeriod * 5): + t.Fatal("wait dbus connect timeout") + case <-connChan: + } + + time.Sleep(time.Second) + close(shutdownChan) + } +} diff --git a/pkg/kubelet/nodeshutdown/systemd/inhibit_linux.go b/pkg/kubelet/nodeshutdown/systemd/inhibit_linux.go index d3c09d316f4..8d3e36a4297 100644 --- a/pkg/kubelet/nodeshutdown/systemd/inhibit_linux.go +++ b/pkg/kubelet/nodeshutdown/systemd/inhibit_linux.go @@ -47,6 +47,17 @@ type DBusCon struct { SystemBus dBusConnector } +func NewDBusCon() (*DBusCon, error) { + conn, err := dbus.SystemBus() + if err != nil { + return nil, err + } + + return &DBusCon{ + SystemBus: conn, + }, nil +} + // InhibitLock is a lock obtained after creating an systemd inhibitor by calling InhibitShutdown(). type InhibitLock uint32 @@ -138,7 +149,11 @@ func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) { go func() { for { - event := <-busChan + event, ok := <-busChan + if !ok { + close(shutdownChan) + return + } if event == nil || len(event.Body) == 0 { klog.ErrorS(nil, "Failed obtaining shutdown event, PrepareForShutdown event was empty") continue @@ -149,7 +164,6 @@ func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) { continue } shutdownChan <- shutdownActive - } }() diff --git a/test/e2e_node/node_shutdown_linux_test.go b/test/e2e_node/node_shutdown_linux_test.go index 1ab5666b36f..d12b2f405b1 100644 --- a/test/e2e_node/node_shutdown_linux_test.go +++ b/test/e2e_node/node_shutdown_linux_test.go @@ -178,6 +178,24 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeAlphaFeature:GracefulNod return nil }, nodeStatusUpdateTimeout, pollInterval).Should(gomega.BeNil()) }) + + ginkgo.It("after restart dbus, should be able to gracefully shutdown", func() { + ginkgo.By("Restart Dbus") + err := restartDbus() + framework.ExpectNoError(err) + + ginkgo.By("Emitting Shutdown signal") + err = emitSignalPrepareForShutdown(true) + framework.ExpectNoError(err) + + gomega.Eventually(func() error { + isReady := getNodeReadyStatus(f) + if isReady { + return fmt.Errorf("node did not become shutdown as expected") + } + return nil + }, nodeStatusUpdateTimeout, pollInterval).Should(gomega.BeNil()) + }) }) }) @@ -237,3 +255,9 @@ func getNodeReadyStatus(f *framework.Framework) bool { framework.ExpectEqual(len(nodeList.Items), 1) return isNodeReady(&nodeList.Items[0]) } + +func restartDbus() error { + cmd := "systemctl restart dbus" + _, err := runCommand("sh", "-c", cmd) + return err +}