mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #100369 from wzshiming/fix/restart-dbus-for-graceful-node-shutdown
After DBus restarts, make GracefulNodeShutdown work again
This commit is contained in:
commit
4e7fc6df63
@ -24,7 +24,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/godbus/dbus/v5"
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
@ -40,14 +39,11 @@ const (
|
|||||||
nodeShutdownReason = "Shutdown"
|
nodeShutdownReason = "Shutdown"
|
||||||
nodeShutdownMessage = "Node is shutting, evicting pods"
|
nodeShutdownMessage = "Node is shutting, evicting pods"
|
||||||
nodeShutdownNotAdmitMessage = "Node is in progress of shutting down, not admitting any new pods"
|
nodeShutdownNotAdmitMessage = "Node is in progress of shutting down, not admitting any new pods"
|
||||||
|
dbusReconnectPeriod = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var systemDbus = func() (dbusInhibiter, error) {
|
var systemDbus = func() (dbusInhibiter, error) {
|
||||||
bus, err := dbus.SystemBus()
|
return systemd.NewDBusCon()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &systemd.DBusCon{SystemBus: bus}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbusInhibiter interface {
|
type dbusInhibiter interface {
|
||||||
@ -109,55 +105,77 @@ func (m *Manager) Start() error {
|
|||||||
if !m.isFeatureEnabled() {
|
if !m.isFeatureEnabled() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
stop, err := m.start()
|
||||||
systemBus, err := systemDbus()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
if stop != nil {
|
||||||
|
<-stop
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(dbusReconnectPeriod)
|
||||||
|
klog.V(1).InfoS("Restarting watch for node shutdown events")
|
||||||
|
stop, err = m.start()
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Unable to watch the node for shutdown events")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) start() (chan struct{}, error) {
|
||||||
|
systemBus, err := systemDbus()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
m.dbusCon = systemBus
|
m.dbusCon = systemBus
|
||||||
|
|
||||||
currentInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
|
currentInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than shutdownGracePeriodRequested, attempt to update the value to shutdownGracePeriodRequested.
|
// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than shutdownGracePeriodRequested, attempt to update the value to shutdownGracePeriodRequested.
|
||||||
if m.shutdownGracePeriodRequested > currentInhibitDelay {
|
if m.shutdownGracePeriodRequested > currentInhibitDelay {
|
||||||
err := m.dbusCon.OverrideInhibitDelay(m.shutdownGracePeriodRequested)
|
err := m.dbusCon.OverrideInhibitDelay(m.shutdownGracePeriodRequested)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to override inhibit delay by shutdown manager: %w", err)
|
return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.dbusCon.ReloadLogindConf()
|
err = m.dbusCon.ReloadLogindConf()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the current inhibitDelay again, if the override was successful, currentInhibitDelay will be equal to shutdownGracePeriodRequested.
|
// Read the current inhibitDelay again, if the override was successful, currentInhibitDelay will be equal to shutdownGracePeriodRequested.
|
||||||
updatedInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
|
updatedInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if updatedInhibitDelay != m.shutdownGracePeriodRequested {
|
if updatedInhibitDelay != m.shutdownGracePeriodRequested {
|
||||||
return fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", m.shutdownGracePeriodRequested, updatedInhibitDelay)
|
return nil, fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", m.shutdownGracePeriodRequested, updatedInhibitDelay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.aquireInhibitLock()
|
err = m.aquireInhibitLock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := m.dbusCon.MonitorShutdown()
|
events, err := m.dbusCon.MonitorShutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
|
releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
|
||||||
if releaseErr != nil {
|
if releaseErr != nil {
|
||||||
return fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %w", releaseErr, err)
|
return nil, fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %v", releaseErr, err)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("failed to monitor shutdown: %w", err)
|
return nil, fmt.Errorf("failed to monitor shutdown: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stop := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
// Monitor for shutdown events. This follows the logind Inhibit Delay pattern described on https://www.freedesktop.org/wiki/Software/systemd/inhibit/
|
// Monitor for shutdown events. This follows the logind Inhibit Delay pattern described on https://www.freedesktop.org/wiki/Software/systemd/inhibit/
|
||||||
// 1. When shutdown manager starts, an inhibit lock is taken.
|
// 1. When shutdown manager starts, an inhibit lock is taken.
|
||||||
@ -165,7 +183,12 @@ func (m *Manager) Start() error {
|
|||||||
// 3. When shutdown(false) event is received, this indicates a previous shutdown was cancelled. In this case, acquire the inhibit lock again.
|
// 3. When shutdown(false) event is received, this indicates a previous shutdown was cancelled. In this case, acquire the inhibit lock again.
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case isShuttingDown := <-events:
|
case isShuttingDown, ok := <-events:
|
||||||
|
if !ok {
|
||||||
|
klog.ErrorS(err, "Ended to watching the node for shutdown events")
|
||||||
|
close(stop)
|
||||||
|
return
|
||||||
|
}
|
||||||
klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
|
klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
|
||||||
|
|
||||||
m.nodeShuttingDownMutex.Lock()
|
m.nodeShuttingDownMutex.Lock()
|
||||||
@ -183,7 +206,7 @@ func (m *Manager) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
return stop, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) aquireInhibitLock() error {
|
func (m *Manager) aquireInhibitLock() error {
|
||||||
|
@ -93,6 +93,10 @@ func makePod(name string, criticalPod bool, terminationGracePeriod *int64) *v1.P
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestManager(t *testing.T) {
|
func TestManager(t *testing.T) {
|
||||||
|
systemDbusTmp := systemDbus
|
||||||
|
defer func() {
|
||||||
|
systemDbus = systemDbusTmp
|
||||||
|
}()
|
||||||
normalPodNoGracePeriod := makePod("normal-pod-nil-grace-period", false /* criticalPod */, nil /* terminationGracePeriod */)
|
normalPodNoGracePeriod := makePod("normal-pod-nil-grace-period", false /* criticalPod */, nil /* terminationGracePeriod */)
|
||||||
criticalPodNoGracePeriod := makePod("critical-pod-nil-grace-period", true /* criticalPod */, nil /* terminationGracePeriod */)
|
criticalPodNoGracePeriod := makePod("critical-pod-nil-grace-period", true /* criticalPod */, nil /* terminationGracePeriod */)
|
||||||
|
|
||||||
@ -305,3 +309,52 @@ func TestFeatureEnabled(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRestart(t *testing.T) {
|
||||||
|
systemDbusTmp := systemDbus
|
||||||
|
defer func() {
|
||||||
|
systemDbus = systemDbusTmp
|
||||||
|
}()
|
||||||
|
|
||||||
|
shutdownGracePeriodRequested := 30 * time.Second
|
||||||
|
shutdownGracePeriodCriticalPods := 10 * time.Second
|
||||||
|
systemInhibitDelay := 40 * time.Second
|
||||||
|
overrideSystemInhibitDelay := 40 * time.Second
|
||||||
|
activePodsFunc := func() []*v1.Pod {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
killPodsFunc := func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
syncNodeStatus := func() {}
|
||||||
|
|
||||||
|
var shutdownChan chan bool
|
||||||
|
var connChan = make(chan struct{}, 1)
|
||||||
|
|
||||||
|
systemDbus = func() (dbusInhibiter, error) {
|
||||||
|
defer func() {
|
||||||
|
connChan <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
shutdownChan = make(chan bool)
|
||||||
|
dbus := &fakeDbus{currentInhibitDelay: systemInhibitDelay, shutdownChan: shutdownChan, overrideSystemInhibitDelay: overrideSystemInhibitDelay}
|
||||||
|
return dbus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
manager, _ := NewManager(activePodsFunc, killPodsFunc, syncNodeStatus, shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods)
|
||||||
|
err := manager.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i != 5; i++ {
|
||||||
|
select {
|
||||||
|
case <-time.After(dbusReconnectPeriod * 5):
|
||||||
|
t.Fatal("wait dbus connect timeout")
|
||||||
|
case <-connChan:
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
close(shutdownChan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -47,6 +47,17 @@ type DBusCon struct {
|
|||||||
SystemBus dBusConnector
|
SystemBus dBusConnector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewDBusCon() (*DBusCon, error) {
|
||||||
|
conn, err := dbus.SystemBus()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &DBusCon{
|
||||||
|
SystemBus: conn,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// InhibitLock is a lock obtained after creating an systemd inhibitor by calling InhibitShutdown().
|
// InhibitLock is a lock obtained after creating an systemd inhibitor by calling InhibitShutdown().
|
||||||
type InhibitLock uint32
|
type InhibitLock uint32
|
||||||
|
|
||||||
@ -138,7 +149,11 @@ func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
event := <-busChan
|
event, ok := <-busChan
|
||||||
|
if !ok {
|
||||||
|
close(shutdownChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
if event == nil || len(event.Body) == 0 {
|
if event == nil || len(event.Body) == 0 {
|
||||||
klog.ErrorS(nil, "Failed obtaining shutdown event, PrepareForShutdown event was empty")
|
klog.ErrorS(nil, "Failed obtaining shutdown event, PrepareForShutdown event was empty")
|
||||||
continue
|
continue
|
||||||
@ -149,7 +164,6 @@ func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
shutdownChan <- shutdownActive
|
shutdownChan <- shutdownActive
|
||||||
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -178,6 +178,24 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeAlphaFeature:GracefulNod
|
|||||||
return nil
|
return nil
|
||||||
}, nodeStatusUpdateTimeout, pollInterval).Should(gomega.BeNil())
|
}, nodeStatusUpdateTimeout, pollInterval).Should(gomega.BeNil())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
ginkgo.It("after restart dbus, should be able to gracefully shutdown", func() {
|
||||||
|
ginkgo.By("Restart Dbus")
|
||||||
|
err := restartDbus()
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
ginkgo.By("Emitting Shutdown signal")
|
||||||
|
err = emitSignalPrepareForShutdown(true)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
gomega.Eventually(func() error {
|
||||||
|
isReady := getNodeReadyStatus(f)
|
||||||
|
if isReady {
|
||||||
|
return fmt.Errorf("node did not become shutdown as expected")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, nodeStatusUpdateTimeout, pollInterval).Should(gomega.BeNil())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -237,3 +255,9 @@ func getNodeReadyStatus(f *framework.Framework) bool {
|
|||||||
framework.ExpectEqual(len(nodeList.Items), 1)
|
framework.ExpectEqual(len(nodeList.Items), 1)
|
||||||
return isNodeReady(&nodeList.Items[0])
|
return isNodeReady(&nodeList.Items[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func restartDbus() error {
|
||||||
|
cmd := "systemctl restart dbus"
|
||||||
|
_, err := runCommand("sh", "-c", cmd)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user