mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #93622 from knight42/test/plugin-register-timeout
Fix race condition in pluginWatcher
This commit is contained in:
commit
fd74333a97
@ -20,6 +20,9 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -29,6 +32,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
|
||||
)
|
||||
@ -39,25 +43,19 @@ var (
|
||||
)
|
||||
|
||||
type fakePluginHandler struct {
|
||||
validatePluginCalled bool
|
||||
registerPluginCalled bool
|
||||
deregisterPluginCalled bool
|
||||
events []string
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func newFakePluginHandler() *fakePluginHandler {
|
||||
return &fakePluginHandler{
|
||||
validatePluginCalled: false,
|
||||
registerPluginCalled: false,
|
||||
deregisterPluginCalled: false,
|
||||
}
|
||||
return &fakePluginHandler{}
|
||||
}
|
||||
|
||||
// ValidatePlugin is a fake method
|
||||
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
f.validatePluginCalled = true
|
||||
f.events = append(f.events, "validate "+pluginName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -65,7 +63,7 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
|
||||
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
f.registerPluginCalled = true
|
||||
f.events = append(f.events, "register "+pluginName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -73,7 +71,13 @@ func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions
|
||||
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
f.deregisterPluginCalled = true
|
||||
f.events = append(f.events, "deregister "+pluginName)
|
||||
}
|
||||
|
||||
func (f *fakePluginHandler) Reset() {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
f.events = nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -90,15 +94,17 @@ func cleanup(t *testing.T) {
|
||||
os.MkdirAll(socketDir, 0755)
|
||||
}
|
||||
|
||||
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
|
||||
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
|
||||
expected := []string{"validate " + pluginName, "register " + pluginName}
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(500*time.Millisecond),
|
||||
100*time.Millisecond,
|
||||
func() (bool, error) {
|
||||
fakePluginHandler.Lock()
|
||||
defer fakePluginHandler.Unlock()
|
||||
if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled {
|
||||
if reflect.DeepEqual(fakePluginHandler.events, expected) {
|
||||
return true, nil
|
||||
}
|
||||
t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
@ -134,19 +140,29 @@ func TestPluginRegistration(t *testing.T) {
|
||||
fakeHandler := newFakePluginHandler()
|
||||
pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
|
||||
|
||||
// Add a new plugin
|
||||
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
|
||||
pluginName := "example-plugin"
|
||||
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
|
||||
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
|
||||
const maxDepth = 3
|
||||
// Make sure the plugin manager is aware of the socket in subdirectories
|
||||
for i := 0; i < maxDepth; i++ {
|
||||
fakeHandler.Reset()
|
||||
pluginDir := socketDir
|
||||
|
||||
// Verify that the plugin is registered
|
||||
waitForRegistration(t, fakeHandler)
|
||||
for j := 0; j < i; j++ {
|
||||
pluginDir = filepath.Join(pluginDir, strconv.Itoa(j))
|
||||
}
|
||||
require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm))
|
||||
socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i))
|
||||
|
||||
// Add a new plugin
|
||||
pluginName := fmt.Sprintf("example-plugin-%d", i)
|
||||
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
|
||||
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
|
||||
|
||||
// Verify that the plugin is registered
|
||||
waitForRegistration(t, fakeHandler, pluginName)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestPluginManager(
|
||||
sockDir string) PluginManager {
|
||||
|
||||
func newTestPluginManager(sockDir string) PluginManager {
|
||||
pm := NewPluginManager(
|
||||
sockDir,
|
||||
&record.FakeRecorder{},
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"k8s.io/klog/v2"
|
||||
@ -36,7 +35,6 @@ type Watcher struct {
|
||||
path string
|
||||
fs utilfs.Filesystem
|
||||
fsWatcher *fsnotify.Watcher
|
||||
stopped chan struct{}
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
}
|
||||
|
||||
@ -53,8 +51,6 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *
|
||||
func (w *Watcher) Start(stopCh <-chan struct{}) error {
|
||||
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
|
||||
|
||||
w.stopped = make(chan struct{})
|
||||
|
||||
// Creating the directory to be watched if it doesn't exist yet,
|
||||
// and walks through the directory to discover the existing plugins.
|
||||
if err := w.init(); err != nil {
|
||||
@ -73,7 +69,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
|
||||
}
|
||||
|
||||
go func(fsWatcher *fsnotify.Watcher) {
|
||||
defer close(w.stopped)
|
||||
for {
|
||||
select {
|
||||
case event := <-fsWatcher.Events:
|
||||
@ -93,14 +88,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
|
||||
}
|
||||
continue
|
||||
case <-stopCh:
|
||||
// In case of plugin watcher being stopped by plugin manager, stop
|
||||
// probing the creation/deletion of plugin sockets.
|
||||
// Also give all pending go routines a chance to complete
|
||||
select {
|
||||
case <-w.stopped:
|
||||
case <-time.After(11 * time.Second):
|
||||
klog.Errorf("timeout on stopping watcher")
|
||||
}
|
||||
w.fsWatcher.Close()
|
||||
return
|
||||
}
|
||||
@ -123,6 +110,12 @@ func (w *Watcher) init() error {
|
||||
// Walks through the plugin directory discover any existing plugin sockets.
|
||||
// Ignore all errors except root dir not being walkable
|
||||
func (w *Watcher) traversePluginDir(dir string) error {
|
||||
// watch the new dir
|
||||
err := w.fsWatcher.Add(dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch %s, err: %v", w.path, err)
|
||||
}
|
||||
// traverse existing children in the dir
|
||||
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
if path == dir {
|
||||
@ -133,6 +126,11 @@ func (w *Watcher) traversePluginDir(dir string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// do not call fsWatcher.Add twice on the root dir to avoid potential problems.
|
||||
if path == dir {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch mode := info.Mode(); {
|
||||
case mode.IsDir():
|
||||
if err := w.fsWatcher.Add(path); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user