From de46e81e74b6bb80a92d8f524ba2b0003b7e9196 Mon Sep 17 00:00:00 2001 From: knight42 Date: Thu, 6 Aug 2020 21:51:11 +0800 Subject: [PATCH] fix(pluginwatcher): watch socket dir before traversing it Signed-off-by: knight42 --- .../pluginmanager/plugin_manager_test.go | 64 ++++++++++++------- .../pluginwatcher/plugin_watcher.go | 24 ++++--- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go index 52a42bb2a90..29c82edc7d9 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager_test.go +++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go @@ -20,6 +20,9 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" + "reflect" + "strconv" "sync" "testing" "time" @@ -29,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher" ) @@ -39,25 +43,19 @@ var ( ) type fakePluginHandler struct { - validatePluginCalled bool - registerPluginCalled bool - deregisterPluginCalled bool + events []string sync.RWMutex } func newFakePluginHandler() *fakePluginHandler { - return &fakePluginHandler{ - validatePluginCalled: false, - registerPluginCalled: false, - deregisterPluginCalled: false, - } + return &fakePluginHandler{} } // ValidatePlugin is a fake method func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { f.Lock() defer f.Unlock() - f.validatePluginCalled = true + f.events = append(f.events, "validate "+pluginName) return nil } @@ -65,7 +63,7 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error { f.Lock() defer f.Unlock() - f.registerPluginCalled = true + f.events = append(f.events, "register "+pluginName) return nil } @@ -73,7 +71,13 @@ func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) { f.Lock() defer f.Unlock() - f.deregisterPluginCalled = true + f.events = append(f.events, "deregister "+pluginName) +} + +func (f *fakePluginHandler) Reset() { + f.Lock() + defer f.Unlock() + f.events = nil } func init() { @@ -90,15 +94,17 @@ func cleanup(t *testing.T) { os.MkdirAll(socketDir, 0755) } -func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) { +func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) { + expected := []string{"validate " + pluginName, "register " + pluginName} err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + 100*time.Millisecond, func() (bool, error) { fakePluginHandler.Lock() defer fakePluginHandler.Unlock() - if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled { + if reflect.DeepEqual(fakePluginHandler.events, expected) { return true, nil } + t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events) return false, nil }, ) @@ -134,19 +140,29 @@ func TestPluginRegistration(t *testing.T) { fakeHandler := newFakePluginHandler() pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler) - // Add a new plugin - socketPath := fmt.Sprintf("%s/plugin.sock", socketDir) - pluginName := "example-plugin" - p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + const maxDepth = 3 + // Make sure the plugin manager is aware of the socket in subdirectories + for i := 0; i < maxDepth; i++ { + fakeHandler.Reset() + pluginDir := socketDir - // Verify that the plugin is registered - waitForRegistration(t, fakeHandler) + for j := 0; j < i; j++ { + pluginDir = filepath.Join(pluginDir, strconv.Itoa(j)) + } + require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm)) + socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i)) + + // Add a new plugin + pluginName := fmt.Sprintf("example-plugin-%d", i) + p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) + require.NoError(t, p.Serve("v1beta1", "v1beta2")) + + // Verify that the plugin is registered + waitForRegistration(t, fakeHandler, pluginName) + } } -func newTestPluginManager( - sockDir string) PluginManager { - +func newTestPluginManager(sockDir string) PluginManager { pm := NewPluginManager( sockDir, &record.FakeRecorder{}, diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go index 7d4522b77b9..f4cacfe0316 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go @@ -21,7 +21,6 @@ import ( "os" "runtime" "strings" - "time" "github.com/fsnotify/fsnotify" "k8s.io/klog/v2" @@ -36,7 +35,6 @@ type Watcher struct { path string fs utilfs.Filesystem fsWatcher *fsnotify.Watcher - stopped chan struct{} desiredStateOfWorld cache.DesiredStateOfWorld } @@ -53,8 +51,6 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) * func (w *Watcher) Start(stopCh <-chan struct{}) error { klog.V(2).Infof("Plugin Watcher Start at %s", w.path) - w.stopped = make(chan struct{}) - // Creating the directory to be watched if it doesn't exist yet, // and walks through the directory to discover the existing plugins. if err := w.init(); err != nil { @@ -73,7 +69,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { } go func(fsWatcher *fsnotify.Watcher) { - defer close(w.stopped) for { select { case event := <-fsWatcher.Events: @@ -93,14 +88,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { } continue case <-stopCh: - // In case of plugin watcher being stopped by plugin manager, stop - // probing the creation/deletion of plugin sockets. - // Also give all pending go routines a chance to complete - select { - case <-w.stopped: - case <-time.After(11 * time.Second): - klog.Errorf("timeout on stopping watcher") - } w.fsWatcher.Close() return } @@ -123,6 +110,12 @@ func (w *Watcher) init() error { // Walks through the plugin directory discover any existing plugin sockets. // Ignore all errors except root dir not being walkable func (w *Watcher) traversePluginDir(dir string) error { + // watch the new dir + err := w.fsWatcher.Add(dir) + if err != nil { + return fmt.Errorf("failed to watch %s, err: %v", w.path, err) + } + // traverse existing children in the dir return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { if path == dir { @@ -133,6 +126,11 @@ func (w *Watcher) traversePluginDir(dir string) error { return nil } + // do not call fsWatcher.Add twice on the root dir to avoid potential problems. + if path == dir { + return nil + } + switch mode := info.Mode(); { case mode.IsDir(): if err := w.fsWatcher.Add(path); err != nil {