mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
automatic plugin discovery should trigger plugin init only for the relevant plugin
This commit is contained in:
parent
23881a9055
commit
128c07fb1e
@ -25,31 +25,24 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
"k8s.io/apimachinery/pkg/util/errors"
|
"k8s.io/apimachinery/pkg/util/errors"
|
||||||
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
|
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
|
||||||
|
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type flexVolumeProber struct {
|
type flexVolumeProber struct {
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
pluginDir string // Flexvolume driver directory
|
pluginDir string // Flexvolume driver directory
|
||||||
watcher utilfs.FSWatcher
|
watcher utilfs.FSWatcher
|
||||||
probeNeeded bool // Must only read and write this through testAndSetProbeNeeded.
|
factory PluginFactory
|
||||||
lastUpdated time.Time // Last time probeNeeded was updated.
|
fs utilfs.Filesystem
|
||||||
watchEventCount int
|
probeAllNeeded bool
|
||||||
factory PluginFactory
|
eventsMap map[string]volume.ProbeOperation // the key is the driver directory path, the value is the coresponding operation
|
||||||
fs utilfs.Filesystem
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
// TODO (cxing) Tune these params based on test results.
|
|
||||||
// watchEventLimit is the max allowable number of processed watches within watchEventInterval.
|
|
||||||
watchEventInterval = 5 * time.Second
|
|
||||||
watchEventLimit = 20
|
|
||||||
)
|
|
||||||
|
|
||||||
func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber {
|
func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber {
|
||||||
return &flexVolumeProber{
|
return &flexVolumeProber{
|
||||||
pluginDir: pluginDir,
|
pluginDir: pluginDir,
|
||||||
@ -60,8 +53,8 @@ func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (prober *flexVolumeProber) Init() error {
|
func (prober *flexVolumeProber) Init() error {
|
||||||
prober.testAndSetProbeNeeded(true)
|
prober.testAndSetProbeAllNeeded(true)
|
||||||
prober.lastUpdated = time.Now()
|
prober.eventsMap = map[string]volume.ProbeOperation{}
|
||||||
|
|
||||||
if err := prober.createPluginDir(); err != nil {
|
if err := prober.createPluginDir(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -73,26 +66,44 @@ func (prober *flexVolumeProber) Init() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Probes for Flexvolume drivers.
|
// If probeAllNeeded is true, probe all pluginDir
|
||||||
// If a filesystem update has occurred since the last probe, updated = true
|
// else probe events in eventsMap
|
||||||
// and the list of probed plugins is returned.
|
func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) {
|
||||||
// Otherwise, update = false and probedPlugins = nil.
|
if prober.probeAllNeeded {
|
||||||
//
|
prober.testAndSetProbeAllNeeded(false)
|
||||||
// If an error occurs, updated and plugins are set arbitrarily.
|
return prober.probeAll()
|
||||||
func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePlugin, err error) {
|
|
||||||
probeNeeded := prober.testAndSetProbeNeeded(false)
|
|
||||||
|
|
||||||
if !probeNeeded {
|
|
||||||
return false, nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return prober.probeMap()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (prober *flexVolumeProber) probeMap() (events []volume.ProbeEvent, err error) {
|
||||||
|
// TODO use a concurrent map to avoid Locking the entire map
|
||||||
|
prober.mutex.Lock()
|
||||||
|
defer prober.mutex.Unlock()
|
||||||
|
probeEvents := []volume.ProbeEvent{}
|
||||||
|
allErrs := []error{}
|
||||||
|
for driverDirPathAbs, op := range prober.eventsMap {
|
||||||
|
driverDirName := filepath.Base(driverDirPathAbs) // e.g. driverDirName = vendor~cifs
|
||||||
|
probeEvent, pluginErr := prober.newProbeEvent(driverDirName, op)
|
||||||
|
if pluginErr != nil {
|
||||||
|
allErrs = append(allErrs, pluginErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
probeEvents = append(probeEvents, probeEvent)
|
||||||
|
|
||||||
|
delete(prober.eventsMap, driverDirPathAbs)
|
||||||
|
}
|
||||||
|
return probeEvents, errors.NewAggregate(allErrs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (prober *flexVolumeProber) probeAll() (events []volume.ProbeEvent, err error) {
|
||||||
|
probeEvents := []volume.ProbeEvent{}
|
||||||
|
allErrs := []error{}
|
||||||
files, err := prober.fs.ReadDir(prober.pluginDir)
|
files, err := prober.fs.ReadDir(prober.pluginDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err)
|
return nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
plugins = []volume.VolumePlugin{}
|
|
||||||
allErrs := []error{}
|
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
// only directories with names that do not begin with '.' are counted as plugins
|
// only directories with names that do not begin with '.' are counted as plugins
|
||||||
// and pluginDir/dirname/dirname should be an executable
|
// and pluginDir/dirname/dirname should be an executable
|
||||||
@ -100,20 +111,39 @@ func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePl
|
|||||||
// e.g. dirname = vendor~cifs
|
// e.g. dirname = vendor~cifs
|
||||||
// then, executable will be pluginDir/dirname/cifs
|
// then, executable will be pluginDir/dirname/cifs
|
||||||
if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
|
if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
|
||||||
plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, f.Name())
|
probeEvent, pluginErr := prober.newProbeEvent(f.Name(), volume.ProbeAddOrUpdate)
|
||||||
if pluginErr != nil {
|
if pluginErr != nil {
|
||||||
pluginErr = fmt.Errorf(
|
|
||||||
"Error creating Flexvolume plugin from directory %s, skipping. Error: %s",
|
|
||||||
f.Name(), pluginErr)
|
|
||||||
allErrs = append(allErrs, pluginErr)
|
allErrs = append(allErrs, pluginErr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
probeEvents = append(probeEvents, probeEvent)
|
||||||
plugins = append(plugins, plugin)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return probeEvents, errors.NewAggregate(allErrs)
|
||||||
|
}
|
||||||
|
|
||||||
return true, plugins, errors.NewAggregate(allErrs)
|
func (prober *flexVolumeProber) newProbeEvent(driverDirName string, op volume.ProbeOperation) (volume.ProbeEvent, error) {
|
||||||
|
probeEvent := volume.ProbeEvent{
|
||||||
|
Op: op,
|
||||||
|
}
|
||||||
|
if op == volume.ProbeAddOrUpdate {
|
||||||
|
plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, driverDirName)
|
||||||
|
if pluginErr != nil {
|
||||||
|
pluginErr = fmt.Errorf(
|
||||||
|
"Error creating Flexvolume plugin from directory %s, skipping. Error: %s",
|
||||||
|
driverDirName, pluginErr)
|
||||||
|
return probeEvent, pluginErr
|
||||||
|
}
|
||||||
|
probeEvent.Plugin = plugin
|
||||||
|
probeEvent.PluginName = plugin.GetPluginName()
|
||||||
|
} else if op == volume.ProbeRemove {
|
||||||
|
driverName := utilstrings.UnescapePluginName(driverDirName)
|
||||||
|
probeEvent.PluginName = flexVolumePluginNamePrefix + driverName
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return probeEvent, fmt.Errorf("Unknown Operation on directory: %s. ", driverDirName)
|
||||||
|
}
|
||||||
|
return probeEvent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
|
func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
|
||||||
@ -127,46 +157,67 @@ func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
parentPathAbs := filepath.Dir(eventPathAbs)
|
||||||
pluginDirAbs, err := filepath.Abs(prober.pluginDir)
|
pluginDirAbs, err := filepath.Abs(prober.pluginDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the Flexvolume plugin directory is removed, need to recreate it
|
// event of pluginDirAbs
|
||||||
// in order to keep it under watch.
|
if eventPathAbs == pluginDirAbs {
|
||||||
if eventOpIs(event, fsnotify.Remove) && eventPathAbs == pluginDirAbs {
|
// If the Flexvolume plugin directory is removed, need to recreate it
|
||||||
if err := prober.createPluginDir(); err != nil {
|
// in order to keep it under watch.
|
||||||
return err
|
if eventOpIs(event, fsnotify.Remove) {
|
||||||
|
if err := prober.createPluginDir(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := prober.addWatchRecursive(pluginDirAbs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := prober.addWatchRecursive(pluginDirAbs); err != nil {
|
return nil
|
||||||
return err
|
}
|
||||||
}
|
|
||||||
} else if eventOpIs(event, fsnotify.Create) {
|
// watch newly added subdirectories inside a driver directory
|
||||||
|
if eventOpIs(event, fsnotify.Create) {
|
||||||
if err := prober.addWatchRecursive(eventPathAbs); err != nil {
|
if err := prober.addWatchRecursive(eventPathAbs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
prober.updateProbeNeeded()
|
eventRelPathToPluginDir, err := filepath.Rel(pluginDirAbs, eventPathAbs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// event inside specific driver dir
|
||||||
|
if len(eventRelPathToPluginDir) > 0 {
|
||||||
|
driverDirName := strings.Split(eventRelPathToPluginDir, string(os.PathSeparator))[0]
|
||||||
|
driverDirAbs := filepath.Join(pluginDirAbs, driverDirName)
|
||||||
|
// executable is removed, will trigger ProbeRemove event
|
||||||
|
if eventOpIs(event, fsnotify.Remove) && (eventRelPathToPluginDir == getExecutablePathRel(driverDirName) || parentPathAbs == pluginDirAbs) {
|
||||||
|
prober.updateEventsMap(driverDirAbs, volume.ProbeRemove)
|
||||||
|
} else {
|
||||||
|
prober.updateEventsMap(driverDirAbs, volume.ProbeAddOrUpdate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (prober *flexVolumeProber) updateProbeNeeded() {
|
// getExecutableName returns the executableName of a flex plugin
|
||||||
// Within 'watchEventInterval' seconds, a max of 'watchEventLimit' watch events is processed.
|
func getExecutablePathRel(driverDirName string) string {
|
||||||
// The watch event will not be registered if the limit is reached.
|
parts := strings.Split(driverDirName, "~")
|
||||||
// This prevents increased disk usage from Probe() being triggered too frequently (either
|
return filepath.Join(driverDirName, parts[len(parts)-1])
|
||||||
// accidentally or maliciously).
|
}
|
||||||
if time.Since(prober.lastUpdated) > watchEventInterval {
|
|
||||||
// Update, then reset the timer and watch count.
|
func (prober *flexVolumeProber) updateEventsMap(eventDirAbs string, op volume.ProbeOperation) {
|
||||||
prober.testAndSetProbeNeeded(true)
|
prober.mutex.Lock()
|
||||||
prober.lastUpdated = time.Now()
|
defer prober.mutex.Unlock()
|
||||||
prober.watchEventCount = 1
|
if prober.probeAllNeeded {
|
||||||
} else if prober.watchEventCount < watchEventLimit {
|
return
|
||||||
prober.testAndSetProbeNeeded(true)
|
|
||||||
prober.watchEventCount++
|
|
||||||
}
|
}
|
||||||
|
prober.eventsMap[eventDirAbs] = op
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recursively adds to watch all directories inside and including the file specified by the given filename.
|
// Recursively adds to watch all directories inside and including the file specified by the given filename.
|
||||||
@ -222,10 +273,10 @@ func (prober *flexVolumeProber) createPluginDir() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (prober *flexVolumeProber) testAndSetProbeNeeded(newval bool) (oldval bool) {
|
func (prober *flexVolumeProber) testAndSetProbeAllNeeded(newval bool) (oldval bool) {
|
||||||
prober.mutex.Lock()
|
prober.mutex.Lock()
|
||||||
defer prober.mutex.Unlock()
|
defer prober.mutex.Unlock()
|
||||||
oldval, prober.probeNeeded = prober.probeNeeded, newval
|
oldval, prober.probeAllNeeded = prober.probeAllNeeded, newval
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,13 +38,13 @@ func TestProberExistingDriverBeforeInit(t *testing.T) {
|
|||||||
driverPath, _, watcher, prober := initTestEnvironment(t)
|
driverPath, _, watcher, prober := initTestEnvironment(t)
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, plugins, err := prober.Probe()
|
events, err := prober.Probe()
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
// Probe occurs, 1 plugin should be returned, and 2 watches (pluginDir and all its
|
// Probe occurs, 1 plugin should be returned, and 2 watches (pluginDir and all its
|
||||||
// current subdirectories) registered.
|
// current subdirectories) registered.
|
||||||
assert.True(t, updated)
|
assert.Equal(t, 1, len(events))
|
||||||
assert.Equal(t, 1, len(plugins))
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op)
|
||||||
assert.Equal(t, pluginDir, watcher.watches[0])
|
assert.Equal(t, pluginDir, watcher.watches[0])
|
||||||
assert.Equal(t, driverPath, watcher.watches[1])
|
assert.Equal(t, driverPath, watcher.watches[1])
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -52,67 +52,120 @@ func TestProberExistingDriverBeforeInit(t *testing.T) {
|
|||||||
// Should no longer probe.
|
// Should no longer probe.
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, plugins, err = prober.Probe()
|
events, err = prober.Probe()
|
||||||
// Assert
|
// Assert
|
||||||
assert.False(t, updated)
|
assert.Equal(t, 0, len(events))
|
||||||
assert.Equal(t, 0, len(plugins))
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Probes newly added drivers after prober is running.
|
// Probes newly added drivers after prober is running.
|
||||||
func TestProberAddDriver(t *testing.T) {
|
func TestProberAddRemoveDriver(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
_, fs, watcher, prober := initTestEnvironment(t)
|
_, fs, watcher, prober := initTestEnvironment(t)
|
||||||
prober.Probe()
|
prober.Probe()
|
||||||
updated, _, _ := prober.Probe()
|
events, err := prober.Probe()
|
||||||
assert.False(t, updated)
|
assert.Equal(t, 0, len(events))
|
||||||
|
|
||||||
// Call probe after a file is added. Should return true.
|
// Call probe after a file is added. Should return 1 event.
|
||||||
|
|
||||||
// Arrange
|
// add driver
|
||||||
const driverName2 = "fake-driver2"
|
const driverName2 = "fake-driver2"
|
||||||
driverPath := path.Join(pluginDir, driverName2)
|
driverPath := path.Join(pluginDir, driverName2)
|
||||||
|
executablePath := path.Join(driverPath, driverName2)
|
||||||
installDriver(driverName2, fs)
|
installDriver(driverName2, fs)
|
||||||
watcher.TriggerEvent(fsnotify.Create, driverPath)
|
watcher.TriggerEvent(fsnotify.Create, driverPath)
|
||||||
watcher.TriggerEvent(fsnotify.Create, path.Join(driverPath, driverName2))
|
watcher.TriggerEvent(fsnotify.Create, executablePath)
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, plugins, err := prober.Probe()
|
events, err = prober.Probe()
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.True(t, updated)
|
assert.Equal(t, 1, len(events))
|
||||||
assert.Equal(t, 2, len(plugins)) // 1 existing, 1 newly added
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) // 1 newly added
|
||||||
assert.Equal(t, driverPath, watcher.watches[len(watcher.watches)-1]) // Checks most recent watch
|
assert.Equal(t, driverPath, watcher.watches[len(watcher.watches)-1]) // Checks most recent watch
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Call probe again, should return false.
|
// Call probe again, should return 0 event.
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, _, err = prober.Probe()
|
events, err = prober.Probe()
|
||||||
// Assert
|
// Assert
|
||||||
assert.False(t, updated)
|
assert.Equal(t, 0, len(events))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Call probe after a non-driver file is added in a subdirectory. Should return true.
|
// Call probe after a non-driver file is added in a subdirectory. should return 1 event.
|
||||||
|
|
||||||
// Arrange
|
|
||||||
fp := path.Join(driverPath, "dummyfile")
|
fp := path.Join(driverPath, "dummyfile")
|
||||||
fs.Create(fp)
|
fs.Create(fp)
|
||||||
watcher.TriggerEvent(fsnotify.Create, fp)
|
watcher.TriggerEvent(fsnotify.Create, fp)
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, plugins, err = prober.Probe()
|
events, err = prober.Probe()
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.True(t, updated)
|
assert.Equal(t, 1, len(events))
|
||||||
assert.Equal(t, 2, len(plugins)) // Number of plugins should not change.
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Call probe again, should return false.
|
// Call probe again, should return 0 event.
|
||||||
// Act
|
// Act
|
||||||
updated, _, err = prober.Probe()
|
events, err = prober.Probe()
|
||||||
// Assert
|
// Assert
|
||||||
assert.False(t, updated)
|
assert.Equal(t, 0, len(events))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Call probe after a subdirectory is added in a driver directory. should return 1 event.
|
||||||
|
subdirPath := path.Join(driverPath, "subdir")
|
||||||
|
fs.Create(subdirPath)
|
||||||
|
watcher.TriggerEvent(fsnotify.Create, subdirPath)
|
||||||
|
|
||||||
|
// Act
|
||||||
|
events, err = prober.Probe()
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 1, len(events))
|
||||||
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Call probe again, should return 0 event.
|
||||||
|
// Act
|
||||||
|
events, err = prober.Probe()
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 0, len(events))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Call probe after a subdirectory is removed in a driver directory. should return 1 event.
|
||||||
|
fs.Remove(subdirPath)
|
||||||
|
watcher.TriggerEvent(fsnotify.Remove, subdirPath)
|
||||||
|
|
||||||
|
// Act
|
||||||
|
events, err = prober.Probe()
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 1, len(events))
|
||||||
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Call probe again, should return 0 event.
|
||||||
|
// Act
|
||||||
|
events, err = prober.Probe()
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 0, len(events))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Call probe after a driver executable and driver directory is remove. should return 1 event.
|
||||||
|
fs.Remove(executablePath)
|
||||||
|
fs.Remove(driverPath)
|
||||||
|
watcher.TriggerEvent(fsnotify.Remove, executablePath)
|
||||||
|
watcher.TriggerEvent(fsnotify.Remove, driverPath)
|
||||||
|
// Act and Assert: 1 ProbeRemove event
|
||||||
|
events, err = prober.Probe()
|
||||||
|
assert.Equal(t, 1, len(events))
|
||||||
|
assert.Equal(t, volume.ProbeRemove, events[0].Op)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Act and Assert: 0 event
|
||||||
|
events, err = prober.Probe()
|
||||||
|
assert.Equal(t, 0, len(events))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,11 +183,10 @@ func TestEmptyPluginDir(t *testing.T) {
|
|||||||
prober.Init()
|
prober.Init()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, plugins, err := prober.Probe()
|
events, err := prober.Probe()
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.True(t, updated)
|
assert.Equal(t, 0, len(events))
|
||||||
assert.Equal(t, 0, len(plugins))
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +206,37 @@ func TestRemovePluginDir(t *testing.T) {
|
|||||||
assert.Equal(t, pluginDir, watcher.watches[len(watcher.watches)-1])
|
assert.Equal(t, pluginDir, watcher.watches[len(watcher.watches)-1])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Issue multiple events and probe multiple times. Should give true, false, false...
|
// Issue an event to remove plugindir. New directory should still be watched.
|
||||||
|
func TestNestedDriverDir(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
_, fs, watcher, _ := initTestEnvironment(t)
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 2, len(watcher.watches)) // 2 from initial setup
|
||||||
|
|
||||||
|
// test add testDriverName
|
||||||
|
testDriverName := "testDriverName"
|
||||||
|
testDriverPath := path.Join(pluginDir, testDriverName)
|
||||||
|
fs.MkdirAll(testDriverPath, 0666)
|
||||||
|
watcher.TriggerEvent(fsnotify.Create, testDriverPath)
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 3, len(watcher.watches)) // 2 from initial setup, 1 from new watch.
|
||||||
|
assert.Equal(t, testDriverPath, watcher.watches[len(watcher.watches)-1])
|
||||||
|
|
||||||
|
// test add nested subdir inside testDriverName
|
||||||
|
basePath := testDriverPath
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
subdirName := "subdirName"
|
||||||
|
subdirPath := path.Join(basePath, subdirName)
|
||||||
|
fs.MkdirAll(subdirPath, 0666)
|
||||||
|
watcher.TriggerEvent(fsnotify.Create, subdirPath)
|
||||||
|
// Assert
|
||||||
|
assert.Equal(t, 4+i, len(watcher.watches)) // 3 + newly added
|
||||||
|
assert.Equal(t, subdirPath, watcher.watches[len(watcher.watches)-1])
|
||||||
|
basePath = subdirPath
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Issue multiple events and probe multiple times.
|
||||||
func TestProberMultipleEvents(t *testing.T) {
|
func TestProberMultipleEvents(t *testing.T) {
|
||||||
const iterations = 5
|
const iterations = 5
|
||||||
|
|
||||||
@ -169,49 +251,20 @@ func TestProberMultipleEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
updated, _, err := prober.Probe()
|
events, err := prober.Probe()
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.True(t, updated)
|
assert.Equal(t, 2, len(events))
|
||||||
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op)
|
||||||
|
assert.Equal(t, volume.ProbeAddOrUpdate, events[1].Op)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
for i := 0; i < iterations-1; i++ {
|
for i := 0; i < iterations-1; i++ {
|
||||||
updated, _, err = prober.Probe()
|
events, err = prober.Probe()
|
||||||
assert.False(t, updated)
|
assert.Equal(t, 0, len(events))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// When many events are triggered quickly in succession, events should stop triggering a probe update
|
|
||||||
// after a certain limit.
|
|
||||||
func TestProberRateLimit(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
driverPath, _, watcher, prober := initTestEnvironment(t)
|
|
||||||
for i := 0; i < watchEventLimit; i++ {
|
|
||||||
watcher.TriggerEvent(fsnotify.Write, path.Join(driverPath, driverName))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Act
|
|
||||||
updated, plugins, err := prober.Probe()
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
// The probe results should not be different from what it would be if none of the events
|
|
||||||
// are triggered.
|
|
||||||
assert.True(t, updated)
|
|
||||||
assert.Equal(t, 1, len(plugins))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Arrange
|
|
||||||
watcher.TriggerEvent(fsnotify.Write, path.Join(driverPath, driverName))
|
|
||||||
|
|
||||||
// Act
|
|
||||||
updated, _, err = prober.Probe()
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
// The last event is outside the event limit. Should not trigger a probe.
|
|
||||||
assert.False(t, updated)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProberError(t *testing.T) {
|
func TestProberError(t *testing.T) {
|
||||||
fs := utilfs.NewFakeFs()
|
fs := utilfs.NewFakeFs()
|
||||||
watcher := NewFakeWatcher()
|
watcher := NewFakeWatcher()
|
||||||
@ -224,7 +277,7 @@ func TestProberError(t *testing.T) {
|
|||||||
installDriver(driverName, fs)
|
installDriver(driverName, fs)
|
||||||
prober.Init()
|
prober.Init()
|
||||||
|
|
||||||
_, _, err := prober.Probe()
|
_, err := prober.Probe()
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,12 +35,22 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ProbeOperation uint32
|
||||||
|
type ProbeEvent struct {
|
||||||
|
Plugin VolumePlugin // VolumePlugin that was added/updated/removed. if ProbeEvent.Op is 'ProbeRemove', Plugin should be nil
|
||||||
|
PluginName string
|
||||||
|
Op ProbeOperation // The operation to the plugin
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Common parameter which can be specified in StorageClass to specify the desired FSType
|
// Common parameter which can be specified in StorageClass to specify the desired FSType
|
||||||
// Provisioners SHOULD implement support for this if they are block device based
|
// Provisioners SHOULD implement support for this if they are block device based
|
||||||
// Must be a filesystem type supported by the host operating system.
|
// Must be a filesystem type supported by the host operating system.
|
||||||
// Ex. "ext4", "xfs", "ntfs". Default value depends on the provisioner
|
// Ex. "ext4", "xfs", "ntfs". Default value depends on the provisioner
|
||||||
VolumeParameterFSType = "fstype"
|
VolumeParameterFSType = "fstype"
|
||||||
|
|
||||||
|
ProbeAddOrUpdate ProbeOperation = 1 << iota
|
||||||
|
ProbeRemove
|
||||||
)
|
)
|
||||||
|
|
||||||
// VolumeOptions contains option information about a volume.
|
// VolumeOptions contains option information about a volume.
|
||||||
@ -74,12 +84,8 @@ type VolumeOptions struct {
|
|||||||
type DynamicPluginProber interface {
|
type DynamicPluginProber interface {
|
||||||
Init() error
|
Init() error
|
||||||
|
|
||||||
// If an update has occurred since the last probe, updated = true
|
// If an error occurs, events are undefined.
|
||||||
// and the list of probed plugins is returned.
|
Probe() (events []ProbeEvent, err error)
|
||||||
// Otherwise, update = false and probedPlugins = nil.
|
|
||||||
//
|
|
||||||
// If an error occurs, updated and probedPlugins are undefined.
|
|
||||||
Probe() (updated bool, probedPlugins []VolumePlugin, err error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumePlugin is an interface to volume plugins that can be used on a
|
// VolumePlugin is an interface to volume plugins that can be used on a
|
||||||
@ -313,7 +319,7 @@ type VolumePluginMgr struct {
|
|||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
plugins map[string]VolumePlugin
|
plugins map[string]VolumePlugin
|
||||||
prober DynamicPluginProber
|
prober DynamicPluginProber
|
||||||
probedPlugins []VolumePlugin
|
probedPlugins map[string]VolumePlugin
|
||||||
Host VolumeHost
|
Host VolumeHost
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -430,6 +436,9 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPlu
|
|||||||
if pm.plugins == nil {
|
if pm.plugins == nil {
|
||||||
pm.plugins = map[string]VolumePlugin{}
|
pm.plugins = map[string]VolumePlugin{}
|
||||||
}
|
}
|
||||||
|
if pm.probedPlugins == nil {
|
||||||
|
pm.probedPlugins = map[string]VolumePlugin{}
|
||||||
|
}
|
||||||
|
|
||||||
allErrs := []error{}
|
allErrs := []error{}
|
||||||
for _, plugin := range plugins {
|
for _, plugin := range plugins {
|
||||||
@ -543,21 +552,25 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
|
|||||||
// Check if probedPlugin cache update is required.
|
// Check if probedPlugin cache update is required.
|
||||||
// If it is, initialize all probed plugins and replace the cache with them.
|
// If it is, initialize all probed plugins and replace the cache with them.
|
||||||
func (pm *VolumePluginMgr) refreshProbedPlugins() {
|
func (pm *VolumePluginMgr) refreshProbedPlugins() {
|
||||||
updated, plugins, err := pm.prober.Probe()
|
events, err := pm.prober.Probe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error dynamically probing plugins: %s", err)
|
glog.Errorf("Error dynamically probing plugins: %s", err)
|
||||||
return // Use cached plugins upon failure.
|
return // Use cached plugins upon failure.
|
||||||
}
|
}
|
||||||
|
|
||||||
if updated {
|
for _, event := range events {
|
||||||
pm.probedPlugins = []VolumePlugin{}
|
if event.Op == ProbeAddOrUpdate {
|
||||||
for _, plugin := range plugins {
|
if err := pm.initProbedPlugin(event.Plugin); err != nil {
|
||||||
if err := pm.initProbedPlugin(plugin); err != nil {
|
|
||||||
glog.Errorf("Error initializing dynamically probed plugin %s; error: %s",
|
glog.Errorf("Error initializing dynamically probed plugin %s; error: %s",
|
||||||
plugin.GetPluginName(), err)
|
event.Plugin.GetPluginName(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pm.probedPlugins = append(pm.probedPlugins, plugin)
|
pm.probedPlugins[event.Plugin.GetPluginName()] = event.Plugin
|
||||||
|
} else if event.Op == ProbeRemove {
|
||||||
|
delete(pm.probedPlugins, event.Plugin.GetPluginName())
|
||||||
|
} else {
|
||||||
|
glog.Errorf("Unknown Operation on PluginName: %s.",
|
||||||
|
event.Plugin.GetPluginName())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -802,5 +815,5 @@ func ValidateRecyclerPodTemplate(pod *v1.Pod) error {
|
|||||||
|
|
||||||
type dummyPluginProber struct{}
|
type dummyPluginProber struct{}
|
||||||
|
|
||||||
func (*dummyPluginProber) Init() error { return nil }
|
func (*dummyPluginProber) Init() error { return nil }
|
||||||
func (*dummyPluginProber) Probe() (bool, []VolumePlugin, error) { return false, nil, nil }
|
func (*dummyPluginProber) Probe() ([]ProbeEvent, error) { return nil, nil }
|
||||||
|
Loading…
Reference in New Issue
Block a user