From 128c07fb1e0d784fdffa1c01db585728f4d4b31a Mon Sep 17 00:00:00 2001 From: linyouchong Date: Sat, 20 Jan 2018 01:51:19 +0800 Subject: [PATCH] automatic plugin discovery should trigger plugin init only for the relevant plugin --- pkg/volume/flexvolume/probe.go | 183 +++++++++++++++++---------- pkg/volume/flexvolume/probe_test.go | 187 ++++++++++++++++++---------- pkg/volume/plugins.go | 45 ++++--- 3 files changed, 266 insertions(+), 149 deletions(-) diff --git a/pkg/volume/flexvolume/probe.go b/pkg/volume/flexvolume/probe.go index d91ad388b09..74bae96249b 100644 --- a/pkg/volume/flexvolume/probe.go +++ b/pkg/volume/flexvolume/probe.go @@ -25,31 +25,24 @@ import ( "fmt" "path/filepath" "sync" - "time" "github.com/fsnotify/fsnotify" "k8s.io/apimachinery/pkg/util/errors" utilfs "k8s.io/kubernetes/pkg/util/filesystem" + utilstrings "k8s.io/kubernetes/pkg/util/strings" + "strings" ) type flexVolumeProber struct { - mutex sync.Mutex - pluginDir string // Flexvolume driver directory - watcher utilfs.FSWatcher - probeNeeded bool // Must only read and write this through testAndSetProbeNeeded. - lastUpdated time.Time // Last time probeNeeded was updated. - watchEventCount int - factory PluginFactory - fs utilfs.Filesystem + mutex sync.Mutex + pluginDir string // Flexvolume driver directory + watcher utilfs.FSWatcher + factory PluginFactory + fs utilfs.Filesystem + probeAllNeeded bool + eventsMap map[string]volume.ProbeOperation // the key is the driver directory path, the value is the coresponding operation } -const ( - // TODO (cxing) Tune these params based on test results. - // watchEventLimit is the max allowable number of processed watches within watchEventInterval. - watchEventInterval = 5 * time.Second - watchEventLimit = 20 -) - func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber { return &flexVolumeProber{ pluginDir: pluginDir, @@ -60,8 +53,8 @@ func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber { } func (prober *flexVolumeProber) Init() error { - prober.testAndSetProbeNeeded(true) - prober.lastUpdated = time.Now() + prober.testAndSetProbeAllNeeded(true) + prober.eventsMap = map[string]volume.ProbeOperation{} if err := prober.createPluginDir(); err != nil { return err @@ -73,26 +66,44 @@ func (prober *flexVolumeProber) Init() error { return nil } -// Probes for Flexvolume drivers. -// If a filesystem update has occurred since the last probe, updated = true -// and the list of probed plugins is returned. -// Otherwise, update = false and probedPlugins = nil. -// -// If an error occurs, updated and plugins are set arbitrarily. -func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePlugin, err error) { - probeNeeded := prober.testAndSetProbeNeeded(false) - - if !probeNeeded { - return false, nil, nil +// If probeAllNeeded is true, probe all pluginDir +// else probe events in eventsMap +func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) { + if prober.probeAllNeeded { + prober.testAndSetProbeAllNeeded(false) + return prober.probeAll() } + return prober.probeMap() +} + +func (prober *flexVolumeProber) probeMap() (events []volume.ProbeEvent, err error) { + // TODO use a concurrent map to avoid Locking the entire map + prober.mutex.Lock() + defer prober.mutex.Unlock() + probeEvents := []volume.ProbeEvent{} + allErrs := []error{} + for driverDirPathAbs, op := range prober.eventsMap { + driverDirName := filepath.Base(driverDirPathAbs) // e.g. driverDirName = vendor~cifs + probeEvent, pluginErr := prober.newProbeEvent(driverDirName, op) + if pluginErr != nil { + allErrs = append(allErrs, pluginErr) + continue + } + probeEvents = append(probeEvents, probeEvent) + + delete(prober.eventsMap, driverDirPathAbs) + } + return probeEvents, errors.NewAggregate(allErrs) +} + +func (prober *flexVolumeProber) probeAll() (events []volume.ProbeEvent, err error) { + probeEvents := []volume.ProbeEvent{} + allErrs := []error{} files, err := prober.fs.ReadDir(prober.pluginDir) if err != nil { - return false, nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err) + return nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err) } - - plugins = []volume.VolumePlugin{} - allErrs := []error{} for _, f := range files { // only directories with names that do not begin with '.' are counted as plugins // and pluginDir/dirname/dirname should be an executable @@ -100,20 +111,39 @@ func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePl // e.g. dirname = vendor~cifs // then, executable will be pluginDir/dirname/cifs if f.IsDir() && filepath.Base(f.Name())[0] != '.' { - plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, f.Name()) + probeEvent, pluginErr := prober.newProbeEvent(f.Name(), volume.ProbeAddOrUpdate) if pluginErr != nil { - pluginErr = fmt.Errorf( - "Error creating Flexvolume plugin from directory %s, skipping. Error: %s", - f.Name(), pluginErr) allErrs = append(allErrs, pluginErr) continue } - - plugins = append(plugins, plugin) + probeEvents = append(probeEvents, probeEvent) } } + return probeEvents, errors.NewAggregate(allErrs) +} - return true, plugins, errors.NewAggregate(allErrs) +func (prober *flexVolumeProber) newProbeEvent(driverDirName string, op volume.ProbeOperation) (volume.ProbeEvent, error) { + probeEvent := volume.ProbeEvent{ + Op: op, + } + if op == volume.ProbeAddOrUpdate { + plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, driverDirName) + if pluginErr != nil { + pluginErr = fmt.Errorf( + "Error creating Flexvolume plugin from directory %s, skipping. Error: %s", + driverDirName, pluginErr) + return probeEvent, pluginErr + } + probeEvent.Plugin = plugin + probeEvent.PluginName = plugin.GetPluginName() + } else if op == volume.ProbeRemove { + driverName := utilstrings.UnescapePluginName(driverDirName) + probeEvent.PluginName = flexVolumePluginNamePrefix + driverName + + } else { + return probeEvent, fmt.Errorf("Unknown Operation on directory: %s. ", driverDirName) + } + return probeEvent, nil } func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error { @@ -127,46 +157,67 @@ func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error { if err != nil { return err } - + parentPathAbs := filepath.Dir(eventPathAbs) pluginDirAbs, err := filepath.Abs(prober.pluginDir) if err != nil { return err } - // If the Flexvolume plugin directory is removed, need to recreate it - // in order to keep it under watch. - if eventOpIs(event, fsnotify.Remove) && eventPathAbs == pluginDirAbs { - if err := prober.createPluginDir(); err != nil { - return err + // event of pluginDirAbs + if eventPathAbs == pluginDirAbs { + // If the Flexvolume plugin directory is removed, need to recreate it + // in order to keep it under watch. + if eventOpIs(event, fsnotify.Remove) { + if err := prober.createPluginDir(); err != nil { + return err + } + if err := prober.addWatchRecursive(pluginDirAbs); err != nil { + return err + } } - if err := prober.addWatchRecursive(pluginDirAbs); err != nil { - return err - } - } else if eventOpIs(event, fsnotify.Create) { + return nil + } + + // watch newly added subdirectories inside a driver directory + if eventOpIs(event, fsnotify.Create) { if err := prober.addWatchRecursive(eventPathAbs); err != nil { return err } } - prober.updateProbeNeeded() + eventRelPathToPluginDir, err := filepath.Rel(pluginDirAbs, eventPathAbs) + if err != nil { + return err + } + + // event inside specific driver dir + if len(eventRelPathToPluginDir) > 0 { + driverDirName := strings.Split(eventRelPathToPluginDir, string(os.PathSeparator))[0] + driverDirAbs := filepath.Join(pluginDirAbs, driverDirName) + // executable is removed, will trigger ProbeRemove event + if eventOpIs(event, fsnotify.Remove) && (eventRelPathToPluginDir == getExecutablePathRel(driverDirName) || parentPathAbs == pluginDirAbs) { + prober.updateEventsMap(driverDirAbs, volume.ProbeRemove) + } else { + prober.updateEventsMap(driverDirAbs, volume.ProbeAddOrUpdate) + } + } return nil } -func (prober *flexVolumeProber) updateProbeNeeded() { - // Within 'watchEventInterval' seconds, a max of 'watchEventLimit' watch events is processed. - // The watch event will not be registered if the limit is reached. - // This prevents increased disk usage from Probe() being triggered too frequently (either - // accidentally or maliciously). - if time.Since(prober.lastUpdated) > watchEventInterval { - // Update, then reset the timer and watch count. - prober.testAndSetProbeNeeded(true) - prober.lastUpdated = time.Now() - prober.watchEventCount = 1 - } else if prober.watchEventCount < watchEventLimit { - prober.testAndSetProbeNeeded(true) - prober.watchEventCount++ +// getExecutableName returns the executableName of a flex plugin +func getExecutablePathRel(driverDirName string) string { + parts := strings.Split(driverDirName, "~") + return filepath.Join(driverDirName, parts[len(parts)-1]) +} + +func (prober *flexVolumeProber) updateEventsMap(eventDirAbs string, op volume.ProbeOperation) { + prober.mutex.Lock() + defer prober.mutex.Unlock() + if prober.probeAllNeeded { + return } + prober.eventsMap[eventDirAbs] = op } // Recursively adds to watch all directories inside and including the file specified by the given filename. @@ -222,10 +273,10 @@ func (prober *flexVolumeProber) createPluginDir() error { return nil } -func (prober *flexVolumeProber) testAndSetProbeNeeded(newval bool) (oldval bool) { +func (prober *flexVolumeProber) testAndSetProbeAllNeeded(newval bool) (oldval bool) { prober.mutex.Lock() defer prober.mutex.Unlock() - oldval, prober.probeNeeded = prober.probeNeeded, newval + oldval, prober.probeAllNeeded = prober.probeAllNeeded, newval return } diff --git a/pkg/volume/flexvolume/probe_test.go b/pkg/volume/flexvolume/probe_test.go index 789aa18b710..b52ec5aa7a6 100644 --- a/pkg/volume/flexvolume/probe_test.go +++ b/pkg/volume/flexvolume/probe_test.go @@ -38,13 +38,13 @@ func TestProberExistingDriverBeforeInit(t *testing.T) { driverPath, _, watcher, prober := initTestEnvironment(t) // Act - updated, plugins, err := prober.Probe() + events, err := prober.Probe() // Assert // Probe occurs, 1 plugin should be returned, and 2 watches (pluginDir and all its // current subdirectories) registered. - assert.True(t, updated) - assert.Equal(t, 1, len(plugins)) + assert.Equal(t, 1, len(events)) + assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) assert.Equal(t, pluginDir, watcher.watches[0]) assert.Equal(t, driverPath, watcher.watches[1]) assert.NoError(t, err) @@ -52,67 +52,120 @@ func TestProberExistingDriverBeforeInit(t *testing.T) { // Should no longer probe. // Act - updated, plugins, err = prober.Probe() + events, err = prober.Probe() // Assert - assert.False(t, updated) - assert.Equal(t, 0, len(plugins)) + assert.Equal(t, 0, len(events)) assert.NoError(t, err) } // Probes newly added drivers after prober is running. -func TestProberAddDriver(t *testing.T) { +func TestProberAddRemoveDriver(t *testing.T) { // Arrange _, fs, watcher, prober := initTestEnvironment(t) prober.Probe() - updated, _, _ := prober.Probe() - assert.False(t, updated) + events, err := prober.Probe() + assert.Equal(t, 0, len(events)) - // Call probe after a file is added. Should return true. + // Call probe after a file is added. Should return 1 event. - // Arrange + // add driver const driverName2 = "fake-driver2" driverPath := path.Join(pluginDir, driverName2) + executablePath := path.Join(driverPath, driverName2) installDriver(driverName2, fs) watcher.TriggerEvent(fsnotify.Create, driverPath) - watcher.TriggerEvent(fsnotify.Create, path.Join(driverPath, driverName2)) + watcher.TriggerEvent(fsnotify.Create, executablePath) // Act - updated, plugins, err := prober.Probe() + events, err = prober.Probe() // Assert - assert.True(t, updated) - assert.Equal(t, 2, len(plugins)) // 1 existing, 1 newly added + assert.Equal(t, 1, len(events)) + assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) // 1 newly added assert.Equal(t, driverPath, watcher.watches[len(watcher.watches)-1]) // Checks most recent watch assert.NoError(t, err) - // Call probe again, should return false. + // Call probe again, should return 0 event. // Act - updated, _, err = prober.Probe() + events, err = prober.Probe() // Assert - assert.False(t, updated) + assert.Equal(t, 0, len(events)) assert.NoError(t, err) - // Call probe after a non-driver file is added in a subdirectory. Should return true. - - // Arrange + // Call probe after a non-driver file is added in a subdirectory. should return 1 event. fp := path.Join(driverPath, "dummyfile") fs.Create(fp) watcher.TriggerEvent(fsnotify.Create, fp) // Act - updated, plugins, err = prober.Probe() + events, err = prober.Probe() // Assert - assert.True(t, updated) - assert.Equal(t, 2, len(plugins)) // Number of plugins should not change. + assert.Equal(t, 1, len(events)) + assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) assert.NoError(t, err) - // Call probe again, should return false. + // Call probe again, should return 0 event. // Act - updated, _, err = prober.Probe() + events, err = prober.Probe() // Assert - assert.False(t, updated) + assert.Equal(t, 0, len(events)) + assert.NoError(t, err) + + // Call probe after a subdirectory is added in a driver directory. should return 1 event. + subdirPath := path.Join(driverPath, "subdir") + fs.Create(subdirPath) + watcher.TriggerEvent(fsnotify.Create, subdirPath) + + // Act + events, err = prober.Probe() + + // Assert + assert.Equal(t, 1, len(events)) + assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) + assert.NoError(t, err) + + // Call probe again, should return 0 event. + // Act + events, err = prober.Probe() + // Assert + assert.Equal(t, 0, len(events)) + assert.NoError(t, err) + + // Call probe after a subdirectory is removed in a driver directory. should return 1 event. + fs.Remove(subdirPath) + watcher.TriggerEvent(fsnotify.Remove, subdirPath) + + // Act + events, err = prober.Probe() + + // Assert + assert.Equal(t, 1, len(events)) + assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) + assert.NoError(t, err) + + // Call probe again, should return 0 event. + // Act + events, err = prober.Probe() + // Assert + assert.Equal(t, 0, len(events)) + assert.NoError(t, err) + + // Call probe after a driver executable and driver directory is remove. should return 1 event. + fs.Remove(executablePath) + fs.Remove(driverPath) + watcher.TriggerEvent(fsnotify.Remove, executablePath) + watcher.TriggerEvent(fsnotify.Remove, driverPath) + // Act and Assert: 1 ProbeRemove event + events, err = prober.Probe() + assert.Equal(t, 1, len(events)) + assert.Equal(t, volume.ProbeRemove, events[0].Op) + assert.NoError(t, err) + + // Act and Assert: 0 event + events, err = prober.Probe() + assert.Equal(t, 0, len(events)) assert.NoError(t, err) } @@ -130,11 +183,10 @@ func TestEmptyPluginDir(t *testing.T) { prober.Init() // Act - updated, plugins, err := prober.Probe() + events, err := prober.Probe() // Assert - assert.True(t, updated) - assert.Equal(t, 0, len(plugins)) + assert.Equal(t, 0, len(events)) assert.NoError(t, err) } @@ -154,7 +206,37 @@ func TestRemovePluginDir(t *testing.T) { assert.Equal(t, pluginDir, watcher.watches[len(watcher.watches)-1]) } -// Issue multiple events and probe multiple times. Should give true, false, false... +// Issue an event to remove plugindir. New directory should still be watched. +func TestNestedDriverDir(t *testing.T) { + // Arrange + _, fs, watcher, _ := initTestEnvironment(t) + // Assert + assert.Equal(t, 2, len(watcher.watches)) // 2 from initial setup + + // test add testDriverName + testDriverName := "testDriverName" + testDriverPath := path.Join(pluginDir, testDriverName) + fs.MkdirAll(testDriverPath, 0666) + watcher.TriggerEvent(fsnotify.Create, testDriverPath) + // Assert + assert.Equal(t, 3, len(watcher.watches)) // 2 from initial setup, 1 from new watch. + assert.Equal(t, testDriverPath, watcher.watches[len(watcher.watches)-1]) + + // test add nested subdir inside testDriverName + basePath := testDriverPath + for i := 0; i < 10; i++ { + subdirName := "subdirName" + subdirPath := path.Join(basePath, subdirName) + fs.MkdirAll(subdirPath, 0666) + watcher.TriggerEvent(fsnotify.Create, subdirPath) + // Assert + assert.Equal(t, 4+i, len(watcher.watches)) // 3 + newly added + assert.Equal(t, subdirPath, watcher.watches[len(watcher.watches)-1]) + basePath = subdirPath + } +} + +// Issue multiple events and probe multiple times. func TestProberMultipleEvents(t *testing.T) { const iterations = 5 @@ -169,49 +251,20 @@ func TestProberMultipleEvents(t *testing.T) { } // Act - updated, _, err := prober.Probe() + events, err := prober.Probe() // Assert - assert.True(t, updated) + assert.Equal(t, 2, len(events)) + assert.Equal(t, volume.ProbeAddOrUpdate, events[0].Op) + assert.Equal(t, volume.ProbeAddOrUpdate, events[1].Op) assert.NoError(t, err) for i := 0; i < iterations-1; i++ { - updated, _, err = prober.Probe() - assert.False(t, updated) + events, err = prober.Probe() + assert.Equal(t, 0, len(events)) assert.NoError(t, err) } } -// When many events are triggered quickly in succession, events should stop triggering a probe update -// after a certain limit. -func TestProberRateLimit(t *testing.T) { - // Arrange - driverPath, _, watcher, prober := initTestEnvironment(t) - for i := 0; i < watchEventLimit; i++ { - watcher.TriggerEvent(fsnotify.Write, path.Join(driverPath, driverName)) - } - - // Act - updated, plugins, err := prober.Probe() - - // Assert - // The probe results should not be different from what it would be if none of the events - // are triggered. - assert.True(t, updated) - assert.Equal(t, 1, len(plugins)) - assert.NoError(t, err) - - // Arrange - watcher.TriggerEvent(fsnotify.Write, path.Join(driverPath, driverName)) - - // Act - updated, _, err = prober.Probe() - - // Assert - // The last event is outside the event limit. Should not trigger a probe. - assert.False(t, updated) - assert.NoError(t, err) -} - func TestProberError(t *testing.T) { fs := utilfs.NewFakeFs() watcher := NewFakeWatcher() @@ -224,7 +277,7 @@ func TestProberError(t *testing.T) { installDriver(driverName, fs) prober.Init() - _, _, err := prober.Probe() + _, err := prober.Probe() assert.Error(t, err) } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 42ceaa40d2f..ce21a738a97 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -35,12 +35,22 @@ import ( "k8s.io/kubernetes/pkg/util/mount" ) +type ProbeOperation uint32 +type ProbeEvent struct { + Plugin VolumePlugin // VolumePlugin that was added/updated/removed. if ProbeEvent.Op is 'ProbeRemove', Plugin should be nil + PluginName string + Op ProbeOperation // The operation to the plugin +} + const ( // Common parameter which can be specified in StorageClass to specify the desired FSType // Provisioners SHOULD implement support for this if they are block device based // Must be a filesystem type supported by the host operating system. // Ex. "ext4", "xfs", "ntfs". Default value depends on the provisioner VolumeParameterFSType = "fstype" + + ProbeAddOrUpdate ProbeOperation = 1 << iota + ProbeRemove ) // VolumeOptions contains option information about a volume. @@ -74,12 +84,8 @@ type VolumeOptions struct { type DynamicPluginProber interface { Init() error - // If an update has occurred since the last probe, updated = true - // and the list of probed plugins is returned. - // Otherwise, update = false and probedPlugins = nil. - // - // If an error occurs, updated and probedPlugins are undefined. - Probe() (updated bool, probedPlugins []VolumePlugin, err error) + // If an error occurs, events are undefined. + Probe() (events []ProbeEvent, err error) } // VolumePlugin is an interface to volume plugins that can be used on a @@ -313,7 +319,7 @@ type VolumePluginMgr struct { mutex sync.Mutex plugins map[string]VolumePlugin prober DynamicPluginProber - probedPlugins []VolumePlugin + probedPlugins map[string]VolumePlugin Host VolumeHost } @@ -430,6 +436,9 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPlu if pm.plugins == nil { pm.plugins = map[string]VolumePlugin{} } + if pm.probedPlugins == nil { + pm.probedPlugins = map[string]VolumePlugin{} + } allErrs := []error{} for _, plugin := range plugins { @@ -543,21 +552,25 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) { // Check if probedPlugin cache update is required. // If it is, initialize all probed plugins and replace the cache with them. func (pm *VolumePluginMgr) refreshProbedPlugins() { - updated, plugins, err := pm.prober.Probe() + events, err := pm.prober.Probe() if err != nil { glog.Errorf("Error dynamically probing plugins: %s", err) return // Use cached plugins upon failure. } - if updated { - pm.probedPlugins = []VolumePlugin{} - for _, plugin := range plugins { - if err := pm.initProbedPlugin(plugin); err != nil { + for _, event := range events { + if event.Op == ProbeAddOrUpdate { + if err := pm.initProbedPlugin(event.Plugin); err != nil { glog.Errorf("Error initializing dynamically probed plugin %s; error: %s", - plugin.GetPluginName(), err) + event.Plugin.GetPluginName(), err) continue } - pm.probedPlugins = append(pm.probedPlugins, plugin) + pm.probedPlugins[event.Plugin.GetPluginName()] = event.Plugin + } else if event.Op == ProbeRemove { + delete(pm.probedPlugins, event.Plugin.GetPluginName()) + } else { + glog.Errorf("Unknown Operation on PluginName: %s.", + event.Plugin.GetPluginName()) } } } @@ -802,5 +815,5 @@ func ValidateRecyclerPodTemplate(pod *v1.Pod) error { type dummyPluginProber struct{} -func (*dummyPluginProber) Init() error { return nil } -func (*dummyPluginProber) Probe() (bool, []VolumePlugin, error) { return false, nil, nil } +func (*dummyPluginProber) Init() error { return nil } +func (*dummyPluginProber) Probe() ([]ProbeEvent, error) { return nil, nil }