mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Process plugins found before the ones watched
Currently, the method `pluginwatcher.traversePluginDir` descends into a directory adding filesystem watchers and creating synthetic `create` events when it finds sockets files. However, a race condition might happen when a recently-added watcher observes a `delete` event in a socket file before `pluginwatcher.traversePlugindir` itself notices this file. This patch changes this behavior by registering watchers on directories, enqueueing and processing `create` events from sockets found, and only then processing the events from the registered watchers.
This commit is contained in:
parent
90fbbee129
commit
817c9ca953
@ -29,6 +29,7 @@ go_test(
|
|||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
|
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
],
|
],
|
||||||
|
@ -102,9 +102,24 @@ func (w *Watcher) Start() error {
|
|||||||
}
|
}
|
||||||
w.fsWatcher = fsWatcher
|
w.fsWatcher = fsWatcher
|
||||||
|
|
||||||
|
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
|
||||||
|
if err := w.traversePluginDir(w.path); err != nil {
|
||||||
|
w.Stop()
|
||||||
|
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Traverse deprecated plugin dir, if specified.
|
||||||
|
if len(w.deprecatedPath) != 0 {
|
||||||
|
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
|
||||||
|
w.Stop()
|
||||||
|
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
w.wg.Add(1)
|
w.wg.Add(1)
|
||||||
go func(fsWatcher *fsnotify.Watcher) {
|
go func(fsWatcher *fsnotify.Watcher) {
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-fsWatcher.Events:
|
case event := <-fsWatcher.Events:
|
||||||
@ -139,20 +154,6 @@ func (w *Watcher) Start() error {
|
|||||||
}
|
}
|
||||||
}(fsWatcher)
|
}(fsWatcher)
|
||||||
|
|
||||||
// Traverse plugin dir after starting the plugin processing goroutine
|
|
||||||
if err := w.traversePluginDir(w.path); err != nil {
|
|
||||||
w.Stop()
|
|
||||||
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Traverse deprecated plugin dir, if specified.
|
|
||||||
if len(w.deprecatedPath) != 0 {
|
|
||||||
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
|
|
||||||
w.Stop()
|
|
||||||
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,14 +212,14 @@ func (w *Watcher) traversePluginDir(dir string) error {
|
|||||||
return fmt.Errorf("failed to watch %s, err: %v", path, err)
|
return fmt.Errorf("failed to watch %s, err: %v", path, err)
|
||||||
}
|
}
|
||||||
case mode&os.ModeSocket != 0:
|
case mode&os.ModeSocket != 0:
|
||||||
w.wg.Add(1)
|
event := fsnotify.Event{
|
||||||
go func() {
|
Name: path,
|
||||||
defer w.wg.Done()
|
Op: fsnotify.Create,
|
||||||
w.fsWatcher.Events <- fsnotify.Event{
|
}
|
||||||
Name: path,
|
//TODO: Handle errors by taking corrective measures
|
||||||
Op: fsnotify.Create,
|
if err := w.handleCreateEvent(event); err != nil {
|
||||||
}
|
klog.Errorf("error %v when handling create event: %s", err, event)
|
||||||
}()
|
}
|
||||||
default:
|
default:
|
||||||
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
|
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
||||||
)
|
)
|
||||||
@ -173,9 +174,6 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
|
|||||||
plugins[i] = p
|
plugins[i] = p
|
||||||
}
|
}
|
||||||
|
|
||||||
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
|
|
||||||
defer func() { require.NoError(t, w.Stop()) }()
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < len(plugins); i++ {
|
for i := 0; i < len(plugins); i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -189,6 +187,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
|
|||||||
}(plugins[i])
|
}(plugins[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
|
||||||
|
defer func() { require.NoError(t, w.Stop()) }()
|
||||||
|
|
||||||
c := make(chan struct{})
|
c := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(c)
|
defer close(c)
|
||||||
@ -198,7 +199,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case <-c:
|
case <-c:
|
||||||
return
|
return
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
t.Fatalf("Timeout while waiting for the plugin registration status")
|
t.Fatalf("Timeout while waiting for the plugin registration status")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -238,11 +239,22 @@ func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing
|
|||||||
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
|
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
|
||||||
hdlr.AddPluginName(pluginName)
|
hdlr.AddPluginName(pluginName)
|
||||||
|
|
||||||
|
c := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(c)
|
||||||
|
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
|
||||||
|
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
|
||||||
|
}()
|
||||||
|
|
||||||
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
|
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
|
||||||
defer func() { require.NoError(t, w.Stop()) }()
|
defer func() { require.NoError(t, w.Stop()) }()
|
||||||
|
|
||||||
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
|
select {
|
||||||
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
|
case <-c:
|
||||||
|
return
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("Timeout while waiting for the plugin registration status")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
|
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
|
||||||
@ -259,7 +271,7 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam
|
|||||||
select {
|
select {
|
||||||
case event := <-eventChan:
|
case event := <-eventChan:
|
||||||
return event == expected
|
return event == expected
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
t.Fatalf("Timed out while waiting for registration status %v", expected)
|
t.Fatalf("Timed out while waiting for registration status %v", expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user