diff --git a/pkg/kubelet/util/pluginwatcher/BUILD b/pkg/kubelet/util/pluginwatcher/BUILD index 5f4e61e5e54..2f37d2b4c08 100644 --- a/pkg/kubelet/util/pluginwatcher/BUILD +++ b/pkg/kubelet/util/pluginwatcher/BUILD @@ -29,6 +29,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/pluginregistration/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go index a9f8422edf0..8fed9fe156c 100644 --- a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go @@ -102,9 +102,24 @@ func (w *Watcher) Start() error { } w.fsWatcher = fsWatcher + // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine. + if err := w.traversePluginDir(w.path); err != nil { + w.Stop() + return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err) + } + + // Traverse deprecated plugin dir, if specified. + if len(w.deprecatedPath) != 0 { + if err := w.traversePluginDir(w.deprecatedPath); err != nil { + w.Stop() + return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err) + } + } + w.wg.Add(1) go func(fsWatcher *fsnotify.Watcher) { defer w.wg.Done() + for { select { case event := <-fsWatcher.Events: @@ -139,20 +154,6 @@ func (w *Watcher) Start() error { } }(fsWatcher) - // Traverse plugin dir after starting the plugin processing goroutine - if err := w.traversePluginDir(w.path); err != nil { - w.Stop() - return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err) - } - - // Traverse deprecated plugin dir, if specified. - if len(w.deprecatedPath) != 0 { - if err := w.traversePluginDir(w.deprecatedPath); err != nil { - w.Stop() - return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err) - } - } - return nil } @@ -211,14 +212,14 @@ func (w *Watcher) traversePluginDir(dir string) error { return fmt.Errorf("failed to watch %s, err: %v", path, err) } case mode&os.ModeSocket != 0: - w.wg.Add(1) - go func() { - defer w.wg.Done() - w.fsWatcher.Events <- fsnotify.Event{ - Name: path, - Op: fsnotify.Create, - } - }() + event := fsnotify.Event{ + Name: path, + Op: fsnotify.Create, + } + //TODO: Handle errors by taking corrective measures + if err := w.handleCreateEvent(event); err != nil { + klog.Errorf("error %v when handling create event: %s", err, event) + } default: klog.V(5).Infof("Ignoring file %s with mode %v", path, mode) } diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go index a7f449c141b..88fed3082b9 100644 --- a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1" ) @@ -173,9 +174,6 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { plugins[i] = p } - w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */) - defer func() { require.NoError(t, w.Stop()) }() - var wg sync.WaitGroup for i := 0; i < len(plugins); i++ { wg.Add(1) @@ -189,6 +187,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { }(plugins[i]) } + w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */) + defer func() { require.NoError(t, w.Stop()) }() + c := make(chan struct{}) go func() { defer close(c) @@ -198,7 +199,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { select { case <-c: return - case <-time.After(2 * time.Second): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("Timeout while waiting for the plugin registration status") } } @@ -238,11 +239,22 @@ func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */) hdlr.AddPluginName(pluginName) + c := make(chan struct{}) + go func() { + defer close(c) + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + }() + w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */) defer func() { require.NoError(t, w.Stop()) }() - require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) - require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + select { + case <-c: + return + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("Timeout while waiting for the plugin registration status") + } } func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool { @@ -259,7 +271,7 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam select { case event := <-eventChan: return event == expected - case <-time.After(2 * time.Second): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("Timed out while waiting for registration status %v", expected) }