Add systemd package to interface with dbus

* Add a new package under nodeshutdown "systemd"
  * Package uses dbus to interface with logind to manage shutdown
  inhibitors
* Make github.com/godbus/dbus a new explicit dependency
  * Update vendor and go modules
This commit is contained in:
David Porter 2020-11-02 23:18:17 +00:00
parent 6d5cb36d36
commit 2343689ce7
8 changed files with 465 additions and 0 deletions

1
go.mod
View File

@ -49,6 +49,7 @@ require (
github.com/go-openapi/strfmt v0.19.3
github.com/go-openapi/validate v0.19.5
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible // indirect
github.com/godbus/dbus/v5 v5.0.3
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7
github.com/golang/mock v1.3.1

View File

@ -308,6 +308,7 @@ filegroup(
"//pkg/kubelet/logs:all-srcs",
"//pkg/kubelet/metrics:all-srcs",
"//pkg/kubelet/network:all-srcs",
"//pkg/kubelet/nodeshutdown/systemd:all-srcs",
"//pkg/kubelet/nodestatus:all-srcs",
"//pkg/kubelet/oom:all-srcs",
"//pkg/kubelet/pleg:all-srcs",

View File

@ -0,0 +1,54 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"inhibit_linux.go",
"inhibit_others.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd",
visibility = ["//visibility:public"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
"//conditions:default": [],
}),
)
go_test(
name = "go_default_test",
srcs = ["inhibit_linux_test.go"],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package systemd provides utility functions for kubelet to perform systemd related operations.
package systemd

View File

@ -0,0 +1,186 @@
// +build linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemd
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"time"
"github.com/godbus/dbus/v5"
"k8s.io/klog/v2"
)
const (
logindService = "org.freedesktop.login1"
logindObject = dbus.ObjectPath("/org/freedesktop/login1")
logindInterface = "org.freedesktop.login1.Manager"
)
type dBusConnector interface {
Object(dest string, path dbus.ObjectPath) dbus.BusObject
AddMatchSignal(options ...dbus.MatchOption) error
Signal(ch chan<- *dbus.Signal)
}
// DBusCon has functions that can be used to interact with systemd and logind over dbus.
type DBusCon struct {
SystemBus dBusConnector
}
// InhibitLock is a lock obtained after creating an systemd inhibitor by calling InhibitShutdown().
type InhibitLock uint32
// CurrentInhibitDelay returns the current delay inhibitor timeout value as configured in logind.conf(5).
// see https://www.freedesktop.org/software/systemd/man/logind.conf.html for more details.
func (bus *DBusCon) CurrentInhibitDelay() (time.Duration, error) {
obj := bus.SystemBus.Object(logindService, logindObject)
res, err := obj.GetProperty(logindInterface + ".InhibitDelayMaxUSec")
if err != nil {
return 0, fmt.Errorf("failed reading InhibitDelayMaxUSec property from logind: %v", err)
}
delay, ok := res.Value().(uint64)
if !ok {
return 0, fmt.Errorf("InhibitDelayMaxUSec from logind is not a uint64 as expected")
}
// InhibitDelayMaxUSec is in microseconds
duration := time.Duration(delay) * time.Microsecond
return duration, nil
}
// InhibitShutdown creates an systemd inhibitor by calling logind's Inhibt() and returns the inhibitor lock
// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details.
func (bus *DBusCon) InhibitShutdown() (InhibitLock, error) {
obj := bus.SystemBus.Object(logindService, logindObject)
what := "shutdown"
who := "kubelet"
why := "Kubelet needs time to handle node shutdown"
mode := "delay"
call := obj.Call("org.freedesktop.login1.Manager.Inhibit", 0, what, who, why, mode)
if call.Err != nil {
return InhibitLock(0), fmt.Errorf("failed creating systemd inhibitor: %v", call.Err)
}
var fd uint32
err := call.Store(&fd)
if err != nil {
return InhibitLock(0), fmt.Errorf("failed storing inhibit lock file descriptor: %v", err)
}
return InhibitLock(fd), nil
}
// ReleaseInhibitLock will release the underlying inhibit lock which will cause the shutdown to start.
func (bus *DBusCon) ReleaseInhibitLock(lock InhibitLock) error {
err := syscall.Close(int(lock))
if err != nil {
return fmt.Errorf("unable to close systemd inhibitor lock: %v", err)
}
return nil
}
// ReloadLogindConf uses dbus to send a SIGHUP to the systemd-logind service causing logind to reload it's configuration.
func (bus *DBusCon) ReloadLogindConf() error {
systemdService := "org.freedesktop.systemd1"
systemdObject := "/org/freedesktop/systemd1"
systemdInterface := "org.freedesktop.systemd1.Manager"
obj := bus.SystemBus.Object(systemdService, dbus.ObjectPath(systemdObject))
unit := "systemd-logind.service"
who := "all"
var signal int32 = 1 // SIGHUP
call := obj.Call(systemdInterface+".KillUnit", 0, unit, who, signal)
if call.Err != nil {
return fmt.Errorf("unable to reload logind conf: %v", call.Err)
}
return nil
}
// MonitorShutdown detects the a node shutdown by watching for "PrepareForShutdown" logind events.
// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details.
func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) {
err := bus.SystemBus.AddMatchSignal(dbus.WithMatchInterface(logindInterface), dbus.WithMatchMember("PrepareForShutdown"), dbus.WithMatchObjectPath("/org/freedesktop/login1"))
if err != nil {
return nil, err
}
busChan := make(chan *dbus.Signal, 1)
bus.SystemBus.Signal(busChan)
shutdownChan := make(chan bool, 1)
go func() {
for {
select {
case event := <-busChan:
if event == nil || len(event.Body) == 0 {
klog.Errorf("Failed obtaining shutdown event, PrepareForShutdown event was empty")
}
shutdownActive, ok := event.Body[0].(bool)
if !ok {
klog.Errorf("Failed obtaining shutdown event, PrepareForShutdown event was not bool type as expected")
return
}
shutdownChan <- shutdownActive
}
}
}()
return shutdownChan, nil
}
const (
logindConfigDirectory = "/etc/systemd/logind.conf.d/"
kubeletLogindConf = "99-kubelet.conf"
)
// OverrideInhibitDelay writes a config file to logind overriding InhibitDelayMaxSec to the value desired.
func (bus *DBusCon) OverrideInhibitDelay(inhibitDelayMax time.Duration) error {
err := os.MkdirAll(logindConfigDirectory, 0755)
if err != nil {
return fmt.Errorf("failed creating %v directory: %v", logindConfigDirectory, err)
}
// This attempts to set the `InhibitDelayMaxUSec` dbus property of logind which is MaxInhibitDelay measured in microseconds.
// The corresponding logind config file property is named `InhibitDelayMaxSec` and is measured in seconds which is set via logind.conf config.
// Refer to https://www.freedesktop.org/software/systemd/man/logind.conf.html for more details.
inhibitOverride := fmt.Sprintf(`# Kubelet logind override
[Login]
InhibitDelayMaxSec=%.0f
`, inhibitDelayMax.Seconds())
logindOverridePath := filepath.Join(logindConfigDirectory, kubeletLogindConf)
if err := ioutil.WriteFile(logindOverridePath, []byte(inhibitOverride), 0755); err != nil {
return fmt.Errorf("failed writing logind shutdown inhibit override file %v: %v", logindOverridePath, err)
}
return nil
}

View File

@ -0,0 +1,185 @@
// +build linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemd
import (
"context"
"fmt"
"testing"
"time"
"github.com/godbus/dbus/v5"
"github.com/stretchr/testify/assert"
)
type fakeDBusObject struct {
properties map[string]interface{}
bodyValue interface{}
}
func (obj *fakeDBusObject) Call(method string, flags dbus.Flags, args ...interface{}) *dbus.Call {
return &dbus.Call{Err: nil, Body: []interface{}{obj.bodyValue}}
}
func (obj *fakeDBusObject) CallWithContext(ctx context.Context, method string, flags dbus.Flags, args ...interface{}) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) Go(method string, flags dbus.Flags, ch chan *dbus.Call, args ...interface{}) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) GoWithContext(ctx context.Context, method string, flags dbus.Flags, ch chan *dbus.Call, args ...interface{}) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) AddMatchSignal(iface, member string, options ...dbus.MatchOption) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) RemoveMatchSignal(iface, member string, options ...dbus.MatchOption) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) GetProperty(p string) (dbus.Variant, error) {
value, ok := obj.properties[p]
if !ok {
return dbus.Variant{}, fmt.Errorf("property %q does not exist in properties: %+v", p, obj.properties)
}
return dbus.MakeVariant(value), nil
}
func (obj *fakeDBusObject) SetProperty(p string, v interface{}) error {
return nil
}
func (obj *fakeDBusObject) Destination() string {
return ""
}
func (obj *fakeDBusObject) Path() dbus.ObjectPath {
return ""
}
type fakeSystemDBus struct {
fakeDBusObject *fakeDBusObject
signalChannel chan<- *dbus.Signal
}
func (f *fakeSystemDBus) Object(dest string, path dbus.ObjectPath) dbus.BusObject {
return f.fakeDBusObject
}
func (f *fakeSystemDBus) Signal(ch chan<- *dbus.Signal) {
f.signalChannel = ch
}
func (f *fakeSystemDBus) AddMatchSignal(options ...dbus.MatchOption) error {
return nil
}
func TestCurrentInhibitDelay(t *testing.T) {
thirtySeconds := time.Duration(30) * time.Second
bus := DBusCon{
SystemBus: &fakeSystemDBus{
fakeDBusObject: &fakeDBusObject{
properties: map[string]interface{}{
"org.freedesktop.login1.Manager.InhibitDelayMaxUSec": uint64(thirtySeconds / time.Microsecond),
},
},
},
}
delay, err := bus.CurrentInhibitDelay()
assert.NoError(t, err)
assert.Equal(t, thirtySeconds, delay)
}
func TestInhibitShutdown(t *testing.T) {
var fakeFd uint32 = 42
bus := DBusCon{
SystemBus: &fakeSystemDBus{
fakeDBusObject: &fakeDBusObject{
bodyValue: fakeFd,
},
},
}
fdLock, err := bus.InhibitShutdown()
assert.Equal(t, InhibitLock(fakeFd), fdLock)
assert.NoError(t, err)
}
func TestReloadLogindConf(t *testing.T) {
bus := DBusCon{
SystemBus: &fakeSystemDBus{
fakeDBusObject: &fakeDBusObject{},
},
}
assert.NoError(t, bus.ReloadLogindConf())
}
func TestMonitorShutdown(t *testing.T) {
var tests = []struct {
desc string
shutdownActive bool
}{
{
desc: "shutdown is active",
shutdownActive: true,
},
{
desc: "shutdown is not active",
shutdownActive: false,
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
fakeSystemBus := &fakeSystemDBus{}
bus := DBusCon{
SystemBus: fakeSystemBus,
}
outChan, err := bus.MonitorShutdown()
assert.NoError(t, err)
done := make(chan bool)
go func() {
select {
case res := <-outChan:
assert.Equal(t, tc.shutdownActive, res)
done <- true
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for shutdown message")
done <- true
}
}()
signal := &dbus.Signal{Body: []interface{}{tc.shutdownActive}}
fakeSystemBus.signalChannel <- signal
<-done
})
}
}

View File

@ -0,0 +1,19 @@
// +build !linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemd

1
vendor/modules.txt vendored
View File

@ -459,6 +459,7 @@ github.com/go-ozzo/ozzo-validation/is
github.com/go-stack/stack
# github.com/go-stack/stack => github.com/go-stack/stack v1.8.0
# github.com/godbus/dbus/v5 v5.0.3 => github.com/godbus/dbus/v5 v5.0.3
## explicit
github.com/godbus/dbus/v5
# github.com/godbus/dbus/v5 => github.com/godbus/dbus/v5 v5.0.3
# github.com/gogo/protobuf v1.3.1 => github.com/gogo/protobuf v1.3.1