diff --git a/pkg/kubelet/util/pluginwatcher/BUILD b/pkg/kubelet/util/pluginwatcher/BUILD index 5f4e61e5e54..2f37d2b4c08 100644 --- a/pkg/kubelet/util/pluginwatcher/BUILD +++ b/pkg/kubelet/util/pluginwatcher/BUILD @@ -29,6 +29,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/pluginregistration/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go index fbe8aa69d4a..4f05f2cf23a 100644 --- a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go @@ -102,44 +102,7 @@ func (w *Watcher) Start() error { } w.fsWatcher = fsWatcher - w.wg.Add(1) - go func(fsWatcher *fsnotify.Watcher) { - defer w.wg.Done() - for { - select { - case event := <-fsWatcher.Events: - //TODO: Handle errors by taking corrective measures - - w.wg.Add(1) - func() { - defer w.wg.Done() - - if event.Op&fsnotify.Create == fsnotify.Create { - err := w.handleCreateEvent(event) - if err != nil { - klog.Errorf("error %v when handling create event: %s", err, event) - } - } else if event.Op&fsnotify.Remove == fsnotify.Remove { - err := w.handleDeleteEvent(event) - if err != nil { - klog.Errorf("error %v when handling delete event: %s", err, event) - } - } - return - }() - continue - case err := <-fsWatcher.Errors: - if err != nil { - klog.Errorf("fsWatcher received error: %v", err) - } - continue - case <-w.stopCh: - return - } - } - }(fsWatcher) - - // Traverse plugin dir after starting the plugin processing goroutine + // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine. if err := w.traversePluginDir(w.path); err != nil { w.Stop() return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err) @@ -153,6 +116,37 @@ func (w *Watcher) Start() error { } } + w.wg.Add(1) + go func(fsWatcher *fsnotify.Watcher) { + defer w.wg.Done() + + for { + select { + case event := <-fsWatcher.Events: + //TODO: Handle errors by taking corrective measures + if event.Op&fsnotify.Create == fsnotify.Create { + err := w.handleCreateEvent(event) + if err != nil { + klog.Errorf("error %v when handling create event: %s", err, event) + } + } else if event.Op&fsnotify.Remove == fsnotify.Remove { + err := w.handleDeleteEvent(event) + if err != nil { + klog.Errorf("error %v when handling delete event: %s", err, event) + } + } + continue + case err := <-fsWatcher.Errors: + if err != nil { + klog.Errorf("fsWatcher received error: %v", err) + } + continue + case <-w.stopCh: + return + } + } + }(fsWatcher) + return nil } @@ -211,14 +205,14 @@ func (w *Watcher) traversePluginDir(dir string) error { return fmt.Errorf("failed to watch %s, err: %v", path, err) } case mode&os.ModeSocket != 0: - w.wg.Add(1) - go func() { - defer w.wg.Done() - w.fsWatcher.Events <- fsnotify.Event{ - Name: path, - Op: fsnotify.Create, - } - }() + event := fsnotify.Event{ + Name: path, + Op: fsnotify.Create, + } + //TODO: Handle errors by taking corrective measures + if err := w.handleCreateEvent(event); err != nil { + klog.Errorf("error %v when handling create event: %s", err, event) + } default: klog.V(5).Infof("Ignoring file %s with mode %v", path, mode) } diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go index a7f449c141b..88fed3082b9 100644 --- a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1" ) @@ -173,9 +174,6 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { plugins[i] = p } - w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */) - defer func() { require.NoError(t, w.Stop()) }() - var wg sync.WaitGroup for i := 0; i < len(plugins); i++ { wg.Add(1) @@ -189,6 +187,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { }(plugins[i]) } + w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */) + defer func() { require.NoError(t, w.Stop()) }() + c := make(chan struct{}) go func() { defer close(c) @@ -198,7 +199,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { select { case <-c: return - case <-time.After(2 * time.Second): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("Timeout while waiting for the plugin registration status") } } @@ -238,11 +239,22 @@ func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */) hdlr.AddPluginName(pluginName) + c := make(chan struct{}) + go func() { + defer close(c) + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + }() + w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */) defer func() { require.NoError(t, w.Stop()) }() - require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) - require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + select { + case <-c: + return + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("Timeout while waiting for the plugin registration status") + } } func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool { @@ -259,7 +271,7 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam select { case event := <-eventChan: return event == expected - case <-time.After(2 * time.Second): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("Timed out while waiting for registration status %v", expected) }