Merge pull request #128495 from olyazavr/refresh-probed-plugins

refresh probed plugins on init to avoid probe race/erroneous unmounts
This commit is contained in:
Kubernetes Prow Robot 2024-11-07 14:57:37 +00:00 committed by GitHub
commit e30492f77a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 66 additions and 0 deletions

View File

@ -608,6 +608,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPlu
pm.plugins[name] = plugin
klog.V(1).InfoS("Loaded volume plugin", "pluginName", name)
}
pm.refreshProbedPlugins()
return utilerrors.NewAggregate(allErrs)
}

View File

@ -17,11 +17,16 @@ limitations under the License.
package volume
import (
"sync"
"sync/atomic"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const testPluginName = "kubernetes.io/testPlugin"
@ -165,3 +170,63 @@ func Test_ValidatePodTemplate(t *testing.T) {
t.Errorf("isPodTemplateValid(%v) returned (%v), want (%v)", pod.String(), got, "Error: pod specification does not contain any volume(s).")
}
}
// TestVolumePluginMultiThreaded tests FindPluginByName/FindPluginBySpec in a multi-threaded environment.
// If these are called by different threads at the same time, they should still be able to reconcile the plugins
// and return the same results (no missing plugin)
func TestVolumePluginMultiThreaded(t *testing.T) {
vpm := VolumePluginMgr{}
var prober DynamicPluginProber = &fakeProber{events: []ProbeEvent{{PluginName: testPluginName, Op: ProbeAddOrUpdate, Plugin: &testPlugins{}}}}
err := vpm.InitPlugins([]VolumePlugin{}, prober, nil)
require.NoError(t, err)
volumeSpec := &Spec{}
totalErrors := atomic.Int32{}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := vpm.FindPluginByName(testPluginName)
if err != nil {
totalErrors.Add(1)
}
}()
}
wg.Wait()
assert.Equal(t, int32(0), totalErrors.Load())
totalErrors.Store(0)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := vpm.FindPluginBySpec(volumeSpec)
if err != nil {
totalErrors.Add(1)
}
}()
}
wg.Wait()
assert.Equal(t, int32(0), totalErrors.Load())
}
type fakeProber struct {
events []ProbeEvent
firstExecution atomic.Bool
}
func (prober *fakeProber) Init() error {
prober.firstExecution.Store(true)
return nil
}
func (prober *fakeProber) Probe() (events []ProbeEvent, err error) {
if prober.firstExecution.CompareAndSwap(true, false) {
return prober.events, nil
}
return []ProbeEvent{}, nil
}