Modify kubelet watcher to support old versions

Modify kubelet plugin watcher to support older CSI drivers that use an
the old plugins directory for socket registration.
Also modify CSI plugin registration to support multiple versions of CSI
registering with the same name.
This commit is contained in:
saad-ali 2018-11-20 20:06:10 -08:00
parent f8983a8988
commit 8f666d9e41
10 changed files with 898 additions and 42 deletions

View File

@ -246,9 +246,13 @@ func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
}
// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
if foundInDeprecatedDir {
return fmt.Errorf("device plugin socket was found in a directory that is no longer supported")
}
if !m.isVersionCompatibleWithPlugin(versions) {
return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
}
@ -263,7 +267,7 @@ func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, version
// RegisterPlugin starts the endpoint and registers it
// TODO: Start the endpoint and wait for the First ListAndWatch call
// before registering the plugin
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string) error {
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)
e, err := newEndpointImpl(endpoint, pluginName, m.callback)

View File

@ -248,7 +248,7 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
}
func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName), "" /* deprecatedSockDir */)
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
w.Start()

View File

@ -789,7 +789,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err
}
if klet.enablePluginsWatcher {
klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsRegistrationDir())
klet.pluginWatcher = pluginwatcher.NewWatcher(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
)
}
// If the experimentalMounterPathFlag is set, we do not want to

View File

@ -38,6 +38,8 @@ type exampleHandler struct {
m sync.Mutex
count int
permitDeprecatedDir bool
}
type examplePluginEvent int
@ -50,16 +52,21 @@ const (
)
// NewExampleHandler provide a example handler
func NewExampleHandler(supportedVersions []string) *exampleHandler {
func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *exampleHandler {
return &exampleHandler{
SupportedVersions: supportedVersions,
ExpectedNames: make(map[string]int),
eventChans: make(map[string]chan examplePluginEvent),
eventChans: make(map[string]chan examplePluginEvent),
permitDeprecatedDir: permitDeprecatedDir,
}
}
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
if foundInDeprecatedDir && !p.permitDeprecatedDir {
return fmt.Errorf("device plugin socket was found in a directory that is no longer supported and this test does not permit plugins from deprecated dir")
}
p.SendEvent(pluginName, exampleEventValidate)
n, ok := p.DecreasePluginCount(pluginName)
@ -79,7 +86,7 @@ func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, vers
return nil
}
func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string) error {
func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventRegister)
// Verifies the grpcServer is ready to serve services.

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -36,11 +37,12 @@ import (
// Watcher is the plugin watcher
type Watcher struct {
path string
stopCh chan interface{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
wg sync.WaitGroup
path string
deprecatedPath string
stopCh chan interface{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
wg sync.WaitGroup
mutex sync.Mutex
handlers map[string]PluginHandler
@ -54,10 +56,13 @@ type pathInfo struct {
}
// NewWatcher provides a new watcher
func NewWatcher(sockDir string) *Watcher {
// deprecatedSockDir refers to a pre-GA directory that was used by older plugins
// for socket registration. New plugins should not use this directory.
func NewWatcher(sockDir string, deprecatedSockDir string) *Watcher {
return &Watcher{
path: sockDir,
fs: &utilfs.DefaultFs{},
path: sockDir,
deprecatedPath: deprecatedSockDir,
fs: &utilfs.DefaultFs{},
handlers: make(map[string]PluginHandler),
plugins: make(map[string]pathInfo),
@ -137,7 +142,15 @@ func (w *Watcher) Start() error {
// Traverse plugin dir after starting the plugin processing goroutine
if err := w.traversePluginDir(w.path); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse plugin socket path, err: %v", err)
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}
// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}
return nil
@ -190,6 +203,10 @@ func (w *Watcher) traversePluginDir(dir string) error {
switch mode := info.Mode(); {
case mode.IsDir():
if w.containsBlacklistedDir(path) {
return filepath.SkipDir
}
if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
@ -216,6 +233,10 @@ func (w *Watcher) traversePluginDir(dir string) error {
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)
if w.containsBlacklistedDir(event.Name) {
return nil
}
fi, err := os.Stat(event.Name)
if err != nil {
return fmt.Errorf("stat file %s failed: %v", event.Name, err)
@ -271,8 +292,10 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
infoResp.Endpoint = socketPath
}
foundInDeprecatedDir := w.foundInDeprecatedDir(socketPath)
// calls handler callback to verify registration request
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, foundInDeprecatedDir); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin validation failed with err: %v", err))
}
@ -280,7 +303,7 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
w.registerPlugin(socketPath, infoResp.Type, infoResp.Name)
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint); err != nil {
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin registration failed with err: %v", err))
}
@ -417,3 +440,27 @@ func dial(unixSocketPath string, timeout time.Duration) (registerapi.Registratio
return registerapi.NewRegistrationClient(c), c, nil
}
// While deprecated dir is supported, to add extra protection around #69015
// we will explicitly blacklist kubernetes.io directory.
func (w *Watcher) containsBlacklistedDir(path string) bool {
return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
path == w.deprecatedPath+"/kubernetes.io"
}
func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
if len(w.deprecatedPath) != 0 {
if socketPath == w.deprecatedPath {
return true
}
deprecatedPath := w.deprecatedPath
if !strings.HasSuffix(deprecatedPath, "/") {
deprecatedPath = deprecatedPath + "/"
}
if strings.HasPrefix(socketPath, deprecatedPath) {
return true
}
}
return false
}

View File

@ -32,7 +32,8 @@ import (
)
var (
socketDir string
socketDir string
deprecatedSocketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
)
@ -50,19 +51,27 @@ func init() {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
d2, err := ioutil.TempDir("", "deprecated_plugin_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
deprecatedSocketDir = d2
}
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
require.NoError(t, os.RemoveAll(deprecatedSocketDir))
os.MkdirAll(socketDir, 0755)
os.MkdirAll(deprecatedSocketDir, 0755)
}
func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions)
w := newWatcherWithHandler(t, hdlr)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
for i := 0; i < 10; i++ {
@ -84,13 +93,40 @@ func TestPluginRegistration(t *testing.T) {
}
}
func TestPluginRegistrationDeprecated(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, true /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, true /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
// Test plugins in deprecated dir
for i := 0; i < 10; i++ {
endpoint := fmt.Sprintf("%s/dep-plugin-%d.sock", deprecatedSocketDir, i)
pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, endpoint, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
require.NoError(t, p.Stop())
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName)))
}
}
func TestPluginReRegistration(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
hdlr := NewExampleHandler(supportedVersions)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
plugins := make([]*examplePlugin, 10)
@ -122,7 +158,7 @@ func TestPluginReRegistration(t *testing.T) {
func TestPluginRegistrationAtKubeletStart(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
plugins := make([]*examplePlugin, 10)
for i := 0; i < len(plugins); i++ {
@ -137,7 +173,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
plugins[i] = p
}
w := newWatcherWithHandler(t, hdlr)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
var wg sync.WaitGroup
@ -173,10 +209,10 @@ func TestPluginRegistrationFailureWithUnsupportedVersion(t *testing.T) {
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
hdlr := NewExampleHandler(supportedVersions)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)
w := newWatcherWithHandler(t, hdlr)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
// Advertise v1beta3 but don't serve anything else than the plugin service
@ -199,10 +235,10 @@ func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
hdlr := NewExampleHandler(supportedVersions)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)
w := newWatcherWithHandler(t, hdlr)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
@ -230,11 +266,215 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam
return false
}
func newWatcherWithHandler(t *testing.T, hdlr PluginHandler) *Watcher {
w := NewWatcher(socketDir)
func newWatcherWithHandler(t *testing.T, hdlr PluginHandler, testDeprecatedDir bool) *Watcher {
depSocketDir := ""
if testDeprecatedDir {
depSocketDir = deprecatedSocketDir
}
w := NewWatcher(socketDir, depSocketDir)
w.AddHandler(registerapi.DevicePlugin, hdlr)
require.NoError(t, w.Start())
return w
}
func TestFoundInDeprecatedDir(t *testing.T) {
testCases := []struct {
sockDir string
deprecatedSockDir string
socketPath string
expectFoundInDeprecatedDir bool
}{
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/kubernetes.io",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/my.driver.com",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/kubernetes.io",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/my.driver.com",
expectFoundInDeprecatedDir: false,
},
}
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir)
actualFoundInDeprecatedDir := watcher.foundInDeprecatedDir(tc.socketPath)
// Assert
if tc.expectFoundInDeprecatedDir != actualFoundInDeprecatedDir {
t.Fatalf("expecting actualFoundInDeprecatedDir=%v, but got %v for testcase: %#v", tc.expectFoundInDeprecatedDir, actualFoundInDeprecatedDir, tc)
}
}
}
func TestContainsBlacklistedDir(t *testing.T) {
testCases := []struct {
sockDir string
deprecatedSockDir string
path string
expected bool
}{
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/csi.sock",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/my.plugin/csi.sock",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my.driver.com",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/kubernetes.io",
expected: false, // New (non-deprecated dir) has no blacklist
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/my.driver.com",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/",
expected: false,
},
}
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir)
actual := watcher.containsBlacklistedDir(tc.path)
// Assert
if tc.expected != actual {
t.Fatalf("expecting %v but got %v for testcase: %#v", tc.expected, actual, tc)
}
}
}

View File

@ -48,11 +48,11 @@ package pluginwatcher
type PluginHandler interface {
// Validate returns an error if the information provided by
// the potential plugin is erroneous (unsupported version, ...)
ValidatePlugin(pluginName string, endpoint string, versions []string) error
ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error
// RegisterPlugin is called so that the plugin can be register by any
// plugin consumer
// Error encountered here can still be Notified to the plugin.
RegisterPlugin(pluginName, endpoint string) error
RegisterPlugin(pluginName, endpoint string, versions []string) error
// DeRegister is called once the pluginwatcher observes that the socket has
// been deleted.
DeRegisterPlugin(pluginName string)

View File

@ -21,6 +21,7 @@ import (
"fmt"
"os"
"path"
"sort"
"strings"
"sync"
"time"
@ -33,6 +34,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
@ -61,6 +63,8 @@ const (
csiResyncPeriod = time.Minute
)
var deprecatedSocketDirVersions = []string{"0.1.0", "0.2.0", "0.3.0", "0.4.0"}
type csiPlugin struct {
host volume.VolumeHost
blockEnabled bool
@ -81,8 +85,9 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
var _ volume.VolumePlugin = &csiPlugin{}
type csiDriver struct {
driverName string
driverEndpoint string
driverName string
driverEndpoint string
highestSupportedVersion *utilversion.Version
}
type csiDriversStore struct {
@ -107,17 +112,35 @@ var PluginHandler = &RegistrationHandler{}
// ValidatePlugin is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
klog.Infof(log("Trying to register a new plugin with name: %s endpoint: %s versions: %s",
pluginName, endpoint, strings.Join(versions, ",")))
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s, foundInDeprecatedDir: %v",
pluginName, endpoint, strings.Join(versions, ","), foundInDeprecatedDir))
return nil
if foundInDeprecatedDir {
// CSI 0.x drivers used /var/lib/kubelet/plugins as the socket dir.
// This was deprecated as the socket dir for kubelet drivers, in lieu of a dedicated dir /var/lib/kubelet/plugins_registry
// The deprecated dir will only be allowed for a whitelisted set of old versions.
// CSI 1.x drivers should use the /var/lib/kubelet/plugins_registry
if !isDeprecatedSocketDirAllowed(versions) {
err := fmt.Errorf("socket for CSI driver %q versions %v was found in a deprecated dir. Drivers implementing CSI 1.x+ must use the new dir", pluginName, versions)
klog.Error(err)
return err
}
}
_, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
return err
}
// RegisterPlugin is called when a plugin can be registered
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string) error {
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
if err != nil {
return err
}
func() {
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
@ -127,7 +150,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string)
// updated in the rest of the function.
csiDrivers.Lock()
defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint, highestSupportedVersion: highestSupportedVersion}
}()
// Get node info from the driver.
@ -159,6 +182,41 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string)
return nil
}
func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) {
if len(versions) == 0 {
err := fmt.Errorf("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName)
klog.Error(err)
return nil, err
}
// Validate version
newDriverHighestVersion, err := highestSupportedVersion(versions)
if err != nil {
err := fmt.Errorf("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err)
klog.Error(err)
return nil, err
}
// Check for existing drivers with the same name
var existingDriver csiDriver
driverExists := false
func() {
csiDrivers.RLock()
defer csiDrivers.RUnlock()
existingDriver, driverExists = csiDrivers.driversMap[pluginName]
}()
if driverExists {
if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
err := fmt.Errorf("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion)
klog.Error(err)
return nil, err
}
}
return newDriverHighestVersion, nil
}
// DeRegisterPlugin is called when a plugin removed its socket, signaling
// it is no longer available
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
@ -593,3 +651,69 @@ func unregisterDriver(driverName string) error {
return nil
}
// Return the highest supported version
func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
if len(versions) == 0 {
return nil, fmt.Errorf("CSI driver reporting empty array for supported versions")
}
// Sort by lowest to highest version
sort.Slice(versions, func(i, j int) bool {
parsedVersionI, err := utilversion.ParseGeneric(versions[i])
if err != nil {
// Push bad values to the bottom
return true
}
parsedVersionJ, err := utilversion.ParseGeneric(versions[j])
if err != nil {
// Push bad values to the bottom
return false
}
return parsedVersionI.LessThan(parsedVersionJ)
})
for i := len(versions) - 1; i >= 0; i-- {
highestSupportedVersion, err := utilversion.ParseGeneric(versions[i])
if err != nil {
return nil, err
}
if highestSupportedVersion.Major() <= 1 {
return highestSupportedVersion, nil
}
}
return nil, fmt.Errorf("None of the CSI versions reported by this driver are supported")
}
// Only CSI 0.x drivers are allowed to use deprecated socket dir.
func isDeprecatedSocketDirAllowed(versions []string) bool {
for _, version := range versions {
if !isV0Version(version) {
return false
}
}
return true
}
func isV0Version(version string) bool {
parsedVersion, err := utilversion.ParseGeneric(version)
if err != nil {
return false
}
return parsedVersion.Major() == 0
}
func isV1Version(version string) bool {
parsedVersion, err := utilversion.ParseGeneric(version)
if err != nil {
return false
}
return parsedVersion.Major() == 1
}

View File

@ -524,3 +524,433 @@ func TestPluginConstructBlockVolumeSpec(t *testing.T) {
}
}
}
func TestValidatePlugin(t *testing.T) {
testCases := []struct {
pluginName string
endpoint string
versions []string
foundInDeprecatedDir bool
shouldFail bool
}{
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.0.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.3.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.0.0"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v0.3.0"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v0.3.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0", "v0.3.0"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.0.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.0.0"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.2.3"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.2.3"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "4.9.12", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "4.9.12", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "boo", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "boo", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"4.9.12", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"4.9.12", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{},
foundInDeprecatedDir: false,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"var", "boo", "foo"},
foundInDeprecatedDir: false,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"var", "boo", "foo"},
foundInDeprecatedDir: true,
shouldFail: true,
},
}
for _, tc := range testCases {
// Arrange & Act
err := PluginHandler.ValidatePlugin(tc.pluginName, tc.endpoint, tc.versions, tc.foundInDeprecatedDir)
// Assert
if tc.shouldFail && err == nil {
t.Fatalf("expecting ValidatePlugin to fail, but got nil error for testcase: %#v", tc)
}
if !tc.shouldFail && err != nil {
t.Fatalf("unexpected error during ValidatePlugin for testcase: %#v\r\n err:%v", tc, err)
}
}
}
func TestValidatePluginExistingDriver(t *testing.T) {
testCases := []struct {
pluginName1 string
endpoint1 string
versions1 []string
pluginName2 string
endpoint2 string
versions2 []string
foundInDeprecatedDir2 bool
shouldFail bool
}{
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin2",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: false,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin2",
endpoint2: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: true,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: true,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v0.3.0", "0.2.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: false,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v0.3.0", "0.2.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions2: []string{"1.0.0"},
foundInDeprecatedDir2: true,
shouldFail: true,
},
}
for _, tc := range testCases {
// Arrange & Act
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
highestSupportedVersions1, err := highestSupportedVersion(tc.versions1)
if err != nil {
t.Fatalf("unexpected error parsing version for testcase: %#v", tc)
}
csiDrivers.driversMap[tc.pluginName1] = csiDriver{driverName: tc.pluginName1, driverEndpoint: tc.endpoint1, highestSupportedVersion: highestSupportedVersions1}
// Arrange & Act
err = PluginHandler.ValidatePlugin(tc.pluginName2, tc.endpoint2, tc.versions2, tc.foundInDeprecatedDir2)
// Assert
if tc.shouldFail && err == nil {
t.Fatalf("expecting ValidatePlugin to fail, but got nil error for testcase: %#v", tc)
}
if !tc.shouldFail && err != nil {
t.Fatalf("unexpected error during ValidatePlugin for testcase: %#v\r\n err:%v", tc, err)
}
}
}
func TestHighestSupportedVersion(t *testing.T) {
testCases := []struct {
versions []string
expectedHighestSupportedVersion string
shouldFail bool
}{
{
versions: []string{"v1.0.0"},
expectedHighestSupportedVersion: "1.0.0",
shouldFail: false,
},
{
versions: []string{"0.3.0"},
expectedHighestSupportedVersion: "0.3.0",
shouldFail: false,
},
{
versions: []string{"0.2.0"},
expectedHighestSupportedVersion: "0.2.0",
shouldFail: false,
},
{
versions: []string{"1.0.0"},
expectedHighestSupportedVersion: "1.0.0",
shouldFail: false,
},
{
versions: []string{"v0.3.0"},
expectedHighestSupportedVersion: "0.3.0",
shouldFail: false,
},
{
versions: []string{"0.2.0"},
expectedHighestSupportedVersion: "0.2.0",
shouldFail: false,
},
{
versions: []string{"0.2.0", "v0.3.0"},
expectedHighestSupportedVersion: "0.3.0",
shouldFail: false,
},
{
versions: []string{"0.2.0", "v1.0.0"},
expectedHighestSupportedVersion: "1.0.0",
shouldFail: false,
},
{
versions: []string{"0.2.0", "v1.2.3"},
expectedHighestSupportedVersion: "1.2.3",
shouldFail: false,
},
{
versions: []string{"v1.2.3", "v0.3.0"},
expectedHighestSupportedVersion: "1.2.3",
shouldFail: false,
},
{
versions: []string{"v1.2.3", "v0.3.0", "2.0.1"},
expectedHighestSupportedVersion: "1.2.3",
shouldFail: false,
},
{
versions: []string{"v1.2.3", "4.9.12", "v0.3.0", "2.0.1"},
expectedHighestSupportedVersion: "1.2.3",
shouldFail: false,
},
{
versions: []string{"4.9.12", "2.0.1"},
expectedHighestSupportedVersion: "",
shouldFail: true,
},
{
versions: []string{"v1.2.3", "boo", "v0.3.0", "2.0.1"},
expectedHighestSupportedVersion: "1.2.3",
shouldFail: false,
},
{
versions: []string{},
expectedHighestSupportedVersion: "",
shouldFail: true,
},
{
versions: []string{"var", "boo", "foo"},
expectedHighestSupportedVersion: "",
shouldFail: true,
},
}
for _, tc := range testCases {
// Arrange & Act
actual, err := highestSupportedVersion(tc.versions)
// Assert
if tc.shouldFail && err == nil {
t.Fatalf("expecting highestSupportedVersion to fail, but got nil error for testcase: %#v", tc)
}
if !tc.shouldFail && err != nil {
t.Fatalf("unexpected error during ValidatePlugin for testcase: %#v\r\n err:%v", tc, err)
}
if tc.expectedHighestSupportedVersion != "" {
result, err := actual.Compare(tc.expectedHighestSupportedVersion)
if err != nil {
t.Fatalf("comparison failed with %v for testcase %#v", err, tc)
}
if result != 0 {
t.Fatalf("expectedHighestSupportedVersion %v, but got %v for tc: %#v", tc.expectedHighestSupportedVersion, actual, tc)
}
}
}
}

View File

@ -133,6 +133,7 @@ func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID str
// If multiple calls to UninstallCSIDriver() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
// TODO: shouldn't this be feature gated?
err := nim.uninstallDriverFromCSINodeInfo(driverName)
if err != nil {
return fmt.Errorf("error uninstalling CSI driver from CSINodeInfo object %v", err)