Merge pull request #114136 from claudiubelu/kubelet-plugins

Updates Kubelet Plugin Registration process
This commit is contained in:
Kubernetes Prow Robot 2024-09-12 00:25:12 +01:00 committed by GitHub
commit d14b0b0cb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 108 additions and 71 deletions

View File

@ -25,6 +25,7 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
@ -53,9 +54,15 @@ type ActualStateOfWorld interface {
// If a plugin does not exist with the given socket path, this is a no-op.
RemovePlugin(socketPath string)
// PluginExists checks if the given plugin exists in the current actual
// state of world cache with the correct timestamp
// PluginExistsWithCorrectTimestamp checks if the given plugin exists in the current actual
// state of world cache with the correct timestamp.
// Deprecated: please use `PluginExistsWithCorrectUUID` instead as it provides a better
// cross-platform support
PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool
// PluginExistsWithCorrectUUID checks if the given plugin exists in the current actual
// state of world cache with the correct UUID
PluginExistsWithCorrectUUID(pluginInfo PluginInfo) bool
}
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld
@ -79,6 +86,7 @@ var _ ActualStateOfWorld = &actualStateOfWorld{}
type PluginInfo struct {
SocketPath string
Timestamp time.Time
UUID types.UID
Handler PluginHandler
Name string
}
@ -124,3 +132,13 @@ func (asw *actualStateOfWorld) PluginExistsWithCorrectTimestamp(pluginInfo Plugi
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
return exists && (actualStatePlugin.Timestamp == pluginInfo.Timestamp)
}
func (asw *actualStateOfWorld) PluginExistsWithCorrectUUID(pluginInfo PluginInfo) bool {
asw.RLock()
defer asw.RUnlock()
// We need to check both if the socket file path exists, and the UUID
// matches the given plugin (from the desired state cache) UUID
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
return exists && (actualStatePlugin.UUID == pluginInfo.UUID)
}

View File

@ -17,20 +17,23 @@ limitations under the License.
package cache
import (
"runtime"
goruntime "runtime"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/uuid"
)
// Calls AddPlugin() to add a plugin
// Verifies newly added plugin exists in GetRegisteredPlugins()
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin
// Verifies PluginExistsWithCorrectUUID returns true for the plugin
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin (excluded on Windows)
func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
UUID: uuid.NewUUID(),
Handler: nil,
Name: "test",
}
@ -50,20 +53,29 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
t.Fatalf("Expected\n%v\nin actual state of world, but got\n%v\n", pluginInfo, aswPlugins[0])
}
// Check PluginExistsWithCorrectUUID returns true
if !asw.PluginExistsWithCorrectUUID(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectUUID returns false for plugin that should be registered")
}
// Check PluginExistsWithCorrectTimestamp returns true
if !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
// 2 consecutive time.Now() calls to be return identical timestamps.
if goruntime.GOOS != "windows" && !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns false for plugin that should be registered")
}
}
// Calls AddPlugin() to add an empty string for socket path
// Verifies the plugin does not exist in GetRegisteredPlugins()
// Verifies PluginExistsWithCorrectTimestamp returns false
// Verifies PluginExistsWithCorrectUUID returns false
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "",
Timestamp: time.Now(),
UUID: uuid.NewUUID(),
Handler: nil,
Name: "test",
}
@ -76,21 +88,30 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
}
// Check PluginExistsWithCorrectUUID returns false
if asw.PluginExistsWithCorrectUUID(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectUUID returns true for plugin that's not registered")
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
// 2 consecutive time.Now() calls to be return identical timestamps.
if goruntime.GOOS != "windows" && asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for plugin that's not registered")
}
}
// Calls RemovePlugin() to remove a plugin
// Verifies newly removed plugin no longer exists in GetRegisteredPlugins()
// Verifies PluginExistsWithCorrectTimestamp returns false
// Verifies PluginExistsWithCorrectUUID returns false
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
func Test_ASW_RemovePlugin_Positive(t *testing.T) {
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
UUID: uuid.NewUUID(),
Handler: nil,
Name: "test",
}
@ -109,25 +130,28 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
}
// Check PluginExistsWithCorrectUUID returns false
if asw.PluginExistsWithCorrectUUID(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectUUID returns true for the removed plugin")
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
// 2 consecutive time.Now() calls to be return identical timestamps.
if goruntime.GOOS != "windows" && asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for the removed plugin")
}
}
// Verifies PluginExistsWithCorrectTimestamp returns false for an existing
// plugin with the wrong timestamp
func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testing.T) {
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
if runtime.GOOS == "windows" {
t.Skip("Skipping test that fails on Windows")
}
// Verifies PluginExistsWithCorrectUUID returns false for an existing
// plugin with the wrong UUID
func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) {
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
UUID: uuid.NewUUID(),
Handler: nil,
Name: "test",
}
@ -140,9 +164,10 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
newerPlugin := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
UUID: uuid.NewUUID(),
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(newerPlugin) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for a plugin with newer timestamp")
// Check PluginExistsWithCorrectUUID returns false
if asw.PluginExistsWithCorrectUUID(newerPlugin) {
t.Fatalf("PluginExistsWithCorrectUUID returns true for a plugin with a different UUID")
}
}

View File

@ -25,6 +25,7 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/klog/v2"
)
@ -132,12 +133,13 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
}
// Update the PluginInfo object.
// Note that we only update the timestamp in the desired state of world, not the actual state of world
// Note that we only update the timestamp and UUID in the desired state of world, not the actual state of world
// because in the reconciler, we need to check if the plugin in the actual state of world is the same
// version as the plugin in the desired state of world
dsw.socketFileToInfo[socketPath] = PluginInfo{
SocketPath: socketPath,
Timestamp: time.Now(),
UUID: uuid.NewUUID(),
}
return nil
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package cache
import (
"runtime"
"testing"
"github.com/stretchr/testify/require"
@ -54,11 +53,6 @@ func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
// Verifies the timestamp the existing plugin is updated
// Verifies newly added plugin returns true for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
if runtime.GOOS == "windows" {
t.Skip("Skipping test that fails on Windows")
}
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
// Adding the plugin for the first time
@ -75,7 +69,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
if dswPlugins[0].SocketPath != socketPath {
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, dswPlugins[0])
}
oldTimestamp := dswPlugins[0].Timestamp
oldUUID := dswPlugins[0].UUID
// Adding the plugin again so that the timestamp will be updated
err = dsw.AddOrUpdatePlugin(socketPath)
@ -90,9 +84,9 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, newDswPlugins[0])
}
// Verify that the new timestamp is newer than the old timestamp
if !newDswPlugins[0].Timestamp.After(oldTimestamp) {
t.Fatal("New timestamp is not newer than the old timestamp", newDswPlugins[0].Timestamp, oldTimestamp)
// Verify that the new UUID is different from the old UUID
if newDswPlugins[0].UUID == oldUUID {
t.Fatal("New UUID is not different from the old UUID", newDswPlugins[0].UUID, oldUUID)
}
}

View File

@ -21,7 +21,7 @@ limitations under the License.
package operationexecutor
import (
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
@ -45,7 +45,7 @@ import (
type OperationExecutor interface {
// RegisterPlugin registers the given plugin using a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
RegisterPlugin(socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
@ -94,11 +94,11 @@ func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
func (oe *operationExecutor) RegisterPlugin(
socketPath string,
timestamp time.Time,
pluginUUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, pluginUUID, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run(
socketPath, generatedOperation)

View File

@ -23,6 +23,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
@ -46,7 +49,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
ch, quit, oe := setup()
for i := 0; i < numPluginsToRegister; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
assert.NoError(t, err)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
t.Fatalf("Unable to start register operations in Concurrent for plugins")
@ -56,8 +60,16 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
for i := 0; i < numPluginsToRegister; i++ {
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
// First registration should not fail.
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
assert.NoError(t, err)
for i := 1; i < numPluginsToRegister; i++ {
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
if err == nil {
t.Fatalf("RegisterPlugin did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name is already executing.> Actual: <no error>", socketPath)
}
}
if !isOperationRunSerially(ch, quit) {
@ -105,7 +117,7 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
socketPath string,
timestamp time.Time,
pluginUUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

View File

@ -27,6 +27,7 @@ import (
"net"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"google.golang.org/grpc"
@ -62,7 +63,7 @@ type OperationGenerator interface {
// Generates the RegisterPlugin function needed to perform the registration of a plugin
GenerateRegisterPluginFunc(
socketPath string,
timestamp time.Time,
UUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
@ -74,7 +75,7 @@ type OperationGenerator interface {
func (og *operationGenerator) GenerateRegisterPluginFunc(
socketPath string,
timestamp time.Time,
pluginUUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
@ -114,7 +115,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
Timestamp: timestamp,
UUID: pluginUUID,
Handler: handler,
Name: infoResp.Name,
})

View File

@ -122,7 +122,7 @@ func (rc *reconciler) reconcile() {
// Iterate through desired state of world plugins and see if there's any plugin
// with the same socket path but different timestamp.
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID {
klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
unregisterPlugin = true
break
@ -148,9 +148,9 @@ func (rc *reconciler) reconcile() {
// Ensure plugins that should be registered are registered
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) {
klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {

View File

@ -20,11 +20,11 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
@ -64,14 +64,14 @@ func runReconciler(reconciler Reconciler) {
func waitForRegistration(
t *testing.T,
socketPath string,
previousTimestamp time.Time,
expectedUUID types.UID,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
func() (bool, error) {
registeredPlugins := asw.GetRegisteredPlugins()
for _, plugin := range registeredPlugins {
if plugin.SocketPath == socketPath && plugin.Timestamp.After(previousTimestamp) {
if plugin.SocketPath == socketPath && plugin.UUID == expectedUUID {
return true, nil
}
}
@ -168,11 +168,6 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
// Calls Run()
// Verifies the actual state of world contains that plugin
func Test_Run_Positive_Register(t *testing.T) {
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
if runtime.GOOS == "windows" {
t.Skip("Skipping test that fails on Windows")
}
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
@ -201,9 +196,9 @@ func Test_Run_Positive_Register(t *testing.T) {
defer func() {
require.NoError(t, p.Stop())
}()
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
plugins := dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
// Get asw plugins; it should contain the added plugin
aswPlugins := asw.GetRegisteredPlugins()
@ -221,11 +216,6 @@ func Test_Run_Positive_Register(t *testing.T) {
// Deletes plugin from desired state of world.
// Verifies that plugin no longer exists in actual state of world.
func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
if runtime.GOOS == "windows" {
t.Skip("Skipping test that fails on Windows")
}
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
@ -252,9 +242,9 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
pluginName := fmt.Sprintf("example-plugin")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
plugins := dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
// Get asw plugins; it should contain the added plugin
aswPlugins := asw.GetRegisteredPlugins()
@ -282,11 +272,6 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
// Verifies that the plugin is reregistered.
// Verifies the plugin with updated timestamp now in actual state of world.
func Test_Run_Positive_ReRegister(t *testing.T) {
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
if runtime.GOOS == "windows" {
t.Skip("Skipping test that fails on Windows")
}
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
@ -313,18 +298,18 @@ func Test_Run_Positive_ReRegister(t *testing.T) {
pluginName := fmt.Sprintf("example-plugin2")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
plugins := dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
timeStampBeforeReRegistration := time.Now()
// Add the plugin again to update the timestamp
dsw.AddOrUpdatePlugin(socketPath)
// This should trigger a deregistration and a regitration
// The process of unregistration and reregistration can happen so fast that
// we are not able to catch it with waitForUnregistration, so here we are checking
// the plugin has an updated timestamp.
waitForRegistration(t, socketPath, timeStampBeforeReRegistration, asw)
plugins = dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
// Get asw plugins; it should contain the added plugin
aswPlugins := asw.GetRegisteredPlugins()