diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 4dfb5e658d5..20d35dad89c 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -608,6 +608,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPlu pm.plugins[name] = plugin klog.V(1).InfoS("Loaded volume plugin", "pluginName", name) } + pm.refreshProbedPlugins() return utilerrors.NewAggregate(allErrs) } diff --git a/pkg/volume/plugins_test.go b/pkg/volume/plugins_test.go index fa638ff138a..781d218e2d8 100644 --- a/pkg/volume/plugins_test.go +++ b/pkg/volume/plugins_test.go @@ -17,11 +17,16 @@ limitations under the License. package volume import ( + "sync" + "sync/atomic" "testing" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const testPluginName = "kubernetes.io/testPlugin" @@ -165,3 +170,63 @@ func Test_ValidatePodTemplate(t *testing.T) { t.Errorf("isPodTemplateValid(%v) returned (%v), want (%v)", pod.String(), got, "Error: pod specification does not contain any volume(s).") } } + +// TestVolumePluginMultiThreaded tests FindPluginByName/FindPluginBySpec in a multi-threaded environment. +// If these are called by different threads at the same time, they should still be able to reconcile the plugins +// and return the same results (no missing plugin) +func TestVolumePluginMultiThreaded(t *testing.T) { + vpm := VolumePluginMgr{} + var prober DynamicPluginProber = &fakeProber{events: []ProbeEvent{{PluginName: testPluginName, Op: ProbeAddOrUpdate, Plugin: &testPlugins{}}}} + err := vpm.InitPlugins([]VolumePlugin{}, prober, nil) + require.NoError(t, err) + + volumeSpec := &Spec{} + totalErrors := atomic.Int32{} + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := vpm.FindPluginByName(testPluginName) + if err != nil { + totalErrors.Add(1) + } + }() + } + wg.Wait() + + assert.Equal(t, int32(0), totalErrors.Load()) + totalErrors.Store(0) + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := vpm.FindPluginBySpec(volumeSpec) + if err != nil { + totalErrors.Add(1) + } + }() + } + wg.Wait() + + assert.Equal(t, int32(0), totalErrors.Load()) +} + +type fakeProber struct { + events []ProbeEvent + firstExecution atomic.Bool +} + +func (prober *fakeProber) Init() error { + prober.firstExecution.Store(true) + return nil +} + +func (prober *fakeProber) Probe() (events []ProbeEvent, err error) { + if prober.firstExecution.CompareAndSwap(true, false) { + return prober.events, nil + } + return []ProbeEvent{}, nil +}