fix(pluginwatcher): watch socket dir before traversing it

Signed-off-by: knight42 <anonymousknight96@gmail.com>
This commit is contained in:
knight42 2020-08-06 21:51:11 +08:00
parent a5090a8ff2
commit de46e81e74
No known key found for this signature in database
GPG Key ID: 1040B69865E7D86C
2 changed files with 51 additions and 37 deletions

View File

@ -20,6 +20,9 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strconv"
"sync"
"testing"
"time"
@ -29,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
)
@ -39,25 +43,19 @@ var (
)
type fakePluginHandler struct {
validatePluginCalled bool
registerPluginCalled bool
deregisterPluginCalled bool
events []string
sync.RWMutex
}
func newFakePluginHandler() *fakePluginHandler {
return &fakePluginHandler{
validatePluginCalled: false,
registerPluginCalled: false,
deregisterPluginCalled: false,
}
return &fakePluginHandler{}
}
// ValidatePlugin is a fake method
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
f.validatePluginCalled = true
f.events = append(f.events, "validate "+pluginName)
return nil
}
@ -65,7 +63,7 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
f.registerPluginCalled = true
f.events = append(f.events, "register "+pluginName)
return nil
}
@ -73,7 +71,13 @@ func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
f.Lock()
defer f.Unlock()
f.deregisterPluginCalled = true
f.events = append(f.events, "deregister "+pluginName)
}
func (f *fakePluginHandler) Reset() {
f.Lock()
defer f.Unlock()
f.events = nil
}
func init() {
@ -90,15 +94,17 @@ func cleanup(t *testing.T) {
os.MkdirAll(socketDir, 0755)
}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
expected := []string{"validate " + pluginName, "register " + pluginName}
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
100*time.Millisecond,
func() (bool, error) {
fakePluginHandler.Lock()
defer fakePluginHandler.Unlock()
if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled {
if reflect.DeepEqual(fakePluginHandler.events, expected) {
return true, nil
}
t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
return false, nil
},
)
@ -134,19 +140,29 @@ func TestPluginRegistration(t *testing.T) {
fakeHandler := newFakePluginHandler()
pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
// Add a new plugin
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
pluginName := "example-plugin"
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
const maxDepth = 3
// Make sure the plugin manager is aware of the socket in subdirectories
for i := 0; i < maxDepth; i++ {
fakeHandler.Reset()
pluginDir := socketDir
// Verify that the plugin is registered
waitForRegistration(t, fakeHandler)
for j := 0; j < i; j++ {
pluginDir = filepath.Join(pluginDir, strconv.Itoa(j))
}
require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm))
socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i))
// Add a new plugin
pluginName := fmt.Sprintf("example-plugin-%d", i)
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
// Verify that the plugin is registered
waitForRegistration(t, fakeHandler, pluginName)
}
}
func newTestPluginManager(
sockDir string) PluginManager {
func newTestPluginManager(sockDir string) PluginManager {
pm := NewPluginManager(
sockDir,
&record.FakeRecorder{},

View File

@ -21,7 +21,6 @@ import (
"os"
"runtime"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"k8s.io/klog/v2"
@ -36,7 +35,6 @@ type Watcher struct {
path string
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
stopped chan struct{}
desiredStateOfWorld cache.DesiredStateOfWorld
}
@ -53,8 +51,6 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *
func (w *Watcher) Start(stopCh <-chan struct{}) error {
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
w.stopped = make(chan struct{})
// Creating the directory to be watched if it doesn't exist yet,
// and walks through the directory to discover the existing plugins.
if err := w.init(); err != nil {
@ -73,7 +69,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
}
go func(fsWatcher *fsnotify.Watcher) {
defer close(w.stopped)
for {
select {
case event := <-fsWatcher.Events:
@ -93,14 +88,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
}
continue
case <-stopCh:
// In case of plugin watcher being stopped by plugin manager, stop
// probing the creation/deletion of plugin sockets.
// Also give all pending go routines a chance to complete
select {
case <-w.stopped:
case <-time.After(11 * time.Second):
klog.Errorf("timeout on stopping watcher")
}
w.fsWatcher.Close()
return
}
@ -123,6 +110,12 @@ func (w *Watcher) init() error {
// Walks through the plugin directory discover any existing plugin sockets.
// Ignore all errors except root dir not being walkable
func (w *Watcher) traversePluginDir(dir string) error {
// watch the new dir
err := w.fsWatcher.Add(dir)
if err != nil {
return fmt.Errorf("failed to watch %s, err: %v", w.path, err)
}
// traverse existing children in the dir
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if path == dir {
@ -133,6 +126,11 @@ func (w *Watcher) traversePluginDir(dir string) error {
return nil
}
// do not call fsWatcher.Add twice on the root dir to avoid potential problems.
if path == dir {
return nil
}
switch mode := info.Mode(); {
case mode.IsDir():
if err := w.fsWatcher.Add(path); err != nil {