fix plugin probe init race causing erroneous volume unmounts

Signed-off-by: Olga Shestopalova <oshestopalova1@gmail.com>
This commit is contained in:
Olga Shestopalova 2024-09-26 11:27:28 -04:00
parent 4b33029691
commit 0746672679
No known key found for this signature in database
4 changed files with 127 additions and 6 deletions

View File

@ -33,7 +33,7 @@ import (
) )
type flexVolumeProber struct { type flexVolumeProber struct {
mutex sync.Mutex mutex sync.RWMutex
pluginDir string // Flexvolume driver directory pluginDir string // Flexvolume driver directory
runner exec.Interface // Interface to use for execing flex calls runner exec.Interface // Interface to use for execing flex calls
watcher utilfs.FSWatcher watcher utilfs.FSWatcher
@ -71,11 +71,20 @@ func (prober *flexVolumeProber) Init() error {
// If probeAllNeeded is true, probe all pluginDir // If probeAllNeeded is true, probe all pluginDir
// else probe events in eventsMap // else probe events in eventsMap
func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) { func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) {
prober.mutex.RLock()
if prober.probeAllNeeded { if prober.probeAllNeeded {
prober.testAndSetProbeAllNeeded(false) prober.mutex.RUnlock()
return prober.probeAll() prober.mutex.Lock()
// check again, if multiple readers got through the first if, only one should probeAll
if prober.probeAllNeeded {
events, err = prober.probeAll()
prober.probeAllNeeded = false
prober.mutex.Unlock()
return
}
prober.mutex.Unlock()
} }
prober.mutex.RUnlock()
return prober.probeMap() return prober.probeMap()
} }

View File

@ -21,6 +21,8 @@ import (
"path/filepath" "path/filepath"
goruntime "runtime" goruntime "runtime"
"strings" "strings"
"sync"
"sync/atomic"
"testing" "testing"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -327,6 +329,47 @@ func TestProberSuccessAndError(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
} }
// TestProberMultiThreaded tests the code path of many callers calling FindPluginBySpec/FindPluginByName
// which then calls refreshProbedPlugins which then calls prober.Probe() and ensures that the prober is thread safe
func TestProberMultiThreaded(t *testing.T) {
// Arrange
_, _, _, prober := initTestEnvironment(t)
totalEvents := atomic.Int32{}
totalErrors := atomic.Int32{}
pluginNameMutex := sync.RWMutex{}
var pluginName string
var wg sync.WaitGroup
// Act
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
events, err := prober.Probe()
for _, event := range events {
if event.Op == volume.ProbeAddOrUpdate {
pluginNameMutex.Lock()
pluginName = event.Plugin.GetPluginName()
pluginNameMutex.Unlock()
}
}
// this fails if ProbeAll is not complete before the next call comes in but we have assumed that it has
pluginNameMutex.RLock()
assert.Equal(t, "fake-driver", pluginName)
pluginNameMutex.RUnlock()
totalEvents.Add(int32(len(events)))
if err != nil {
totalErrors.Add(1)
}
}()
wg.Add(1)
}
wg.Wait()
// Assert
assert.Equal(t, int32(1), totalEvents.Load())
assert.Equal(t, int32(0), totalErrors.Load())
}
// Installs a mock driver (an empty file) in the mock fs. // Installs a mock driver (an empty file) in the mock fs.
func installDriver(driverName string, fs utilfs.Filesystem) { func installDriver(driverName string, fs utilfs.Filesystem) {
driverPath := filepath.Join(pluginDir, driverName) driverPath := filepath.Join(pluginDir, driverName)

View File

@ -627,6 +627,7 @@ func (pm *VolumePluginMgr) initProbedPlugin(probedPlugin VolumePlugin) error {
// specification. If no plugins can support or more than one plugin can // specification. If no plugins can support or more than one plugin can
// support it, return error. // support it, return error.
func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) { func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
pm.refreshProbedPlugins()
pm.mutex.RLock() pm.mutex.RLock()
defer pm.mutex.RUnlock() defer pm.mutex.RUnlock()
@ -643,7 +644,6 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
} }
} }
pm.refreshProbedPlugins()
for _, plugin := range pm.probedPlugins { for _, plugin := range pm.probedPlugins {
if plugin.CanSupport(spec) { if plugin.CanSupport(spec) {
match = plugin match = plugin
@ -663,6 +663,7 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
// FindPluginByName fetches a plugin by name. If no plugin is found, returns error. // FindPluginByName fetches a plugin by name. If no plugin is found, returns error.
func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) { func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
pm.refreshProbedPlugins()
pm.mutex.RLock() pm.mutex.RLock()
defer pm.mutex.RUnlock() defer pm.mutex.RUnlock()
@ -671,7 +672,6 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
match = v match = v
} }
pm.refreshProbedPlugins()
if plugin, found := pm.probedPlugins[name]; found { if plugin, found := pm.probedPlugins[name]; found {
if match != nil { if match != nil {
return nil, fmt.Errorf("multiple volume plugins matched: %s and %s", match.GetPluginName(), plugin.GetPluginName()) return nil, fmt.Errorf("multiple volume plugins matched: %s and %s", match.GetPluginName(), plugin.GetPluginName())
@ -694,6 +694,12 @@ func (pm *VolumePluginMgr) refreshProbedPlugins() {
klog.ErrorS(err, "Error dynamically probing plugins") klog.ErrorS(err, "Error dynamically probing plugins")
} }
if len(events) == 0 {
return
}
pm.mutex.Lock()
defer pm.mutex.Unlock()
// because the probe function can return a list of valid plugins // because the probe function can return a list of valid plugins
// even when an error is present we still must add the plugins // even when an error is present we still must add the plugins
// or they will be skipped because each event only fires once // or they will be skipped because each event only fires once

View File

@ -17,6 +17,10 @@ limitations under the License.
package volume package volume
import ( import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sync"
"sync/atomic"
"testing" "testing"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -165,3 +169,62 @@ func Test_ValidatePodTemplate(t *testing.T) {
t.Errorf("isPodTemplateValid(%v) returned (%v), want (%v)", pod.String(), got, "Error: pod specification does not contain any volume(s).") t.Errorf("isPodTemplateValid(%v) returned (%v), want (%v)", pod.String(), got, "Error: pod specification does not contain any volume(s).")
} }
} }
// TestVolumePluginMultiThreaded tests FindPluginByName/FindPluginBySpec in a multi-threaded environment.
// If these are called by different threads at the same time, they should still be able to reconcile the plugins
// and return the same results (no missing plugin)
func TestVolumePluginMultiThreaded(t *testing.T) {
vpm := VolumePluginMgr{}
var prober DynamicPluginProber = &fakeProber{events: []ProbeEvent{{PluginName: testPluginName, Op: ProbeAddOrUpdate, Plugin: &testPlugins{}}}}
err := vpm.InitPlugins([]VolumePlugin{}, prober, nil)
require.NoError(t, err)
volumeSpec := &Spec{}
totalErrors := atomic.Int32{}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
_, err := vpm.FindPluginByName(testPluginName)
if err != nil {
totalErrors.Add(1)
}
}()
wg.Add(1)
}
wg.Wait()
assert.Equal(t, int32(0), totalErrors.Load())
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
_, err := vpm.FindPluginBySpec(volumeSpec)
if err != nil {
totalErrors.Add(1)
}
}()
wg.Add(1)
}
wg.Wait()
assert.Equal(t, int32(0), totalErrors.Load())
}
type fakeProber struct {
events []ProbeEvent
firstExecution atomic.Bool
}
func (prober *fakeProber) Init() error {
prober.firstExecution.Store(true)
return nil
}
func (prober *fakeProber) Probe() (events []ProbeEvent, err error) {
if prober.firstExecution.CompareAndSwap(true, false) {
return prober.events, nil
}
return []ProbeEvent{}, nil
}