diff --git a/hack/.golint_failures b/hack/.golint_failures index 8c9725d4132..48de3c403b8 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -382,7 +382,9 @@ pkg/volume/azure_dd pkg/volume/azure_file pkg/volume/cephfs pkg/volume/configmap +pkg/volume/csi pkg/volume/csi/fake +pkg/volume/csi/labelmanager pkg/volume/empty_dir pkg/volume/fc pkg/volume/flexvolume diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index e63c5dc958f..8b1d44826da 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -102,6 +102,7 @@ go_library( "//pkg/util/removeall:go_default_library", "//pkg/version:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/csi:go_default_library", "//pkg/volume/util:go_default_library", "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4b94359117e..0484cfdfc3c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -106,6 +106,7 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" utilexec "k8s.io/utils/exec" ) @@ -1290,6 +1291,9 @@ func (kl *Kubelet) initializeModules() error { } } if kl.enablePluginsWatcher { + // Adding Registration Callback function for CSI Driver + kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback) + // Start the plugin watcher if err := kl.pluginWatcher.Start(); err != nil { return fmt.Errorf("failed to start Plugin Watcher. err: %v", err) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 63e72468708..c8e7efc1f9d 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -12,9 +12,11 @@ go_library( importpath = "k8s.io/kubernetes/pkg/volume/csi", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/csi/labelmanager:go_default_library", "//pkg/volume/util:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library", "//vendor/github.com/golang/glog:go_default_library", @@ -25,6 +27,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) @@ -70,6 +73,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/volume/csi/fake:all-srcs", + "//pkg/volume/csi/labelmanager:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 6398ce455d1..c6d11abb8cb 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -27,6 +27,8 @@ import ( "github.com/golang/glog" "google.golang.org/grpc" api "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) type csiClient interface { @@ -255,9 +257,16 @@ func newGrpcConn(driverName string) (*grpc.ClientConn, error) { if driverName == "" { return nil, fmt.Errorf("driver name is empty") } - - network := "unix" addr := fmt.Sprintf(csiAddrTemplate, driverName) + // TODO once KubeletPluginsWatcher graduates to beta, remove FeatureGate check + if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) { + driver, ok := csiDrivers.driversMap[driverName] + if !ok { + return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName) + } + addr = driver.driverEndpoint + } + network := "unix" glog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr)) return grpc.Dial( diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 8a993afb920..48b2a9835ad 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -21,6 +21,8 @@ import ( "fmt" "os" "path" + "strings" + "sync" "time" "github.com/golang/glog" @@ -29,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi/labelmanager" ) const ( @@ -59,9 +62,54 @@ func ProbeVolumePlugins() []volume.VolumePlugin { // volume.VolumePlugin methods var _ volume.VolumePlugin = &csiPlugin{} +type csiDriver struct { + driverName string + driverEndpoint string +} + +type csiDriversStore struct { + driversMap map[string]csiDriver + sync.RWMutex +} + +// csiDrivers map keep track of all registered CSI drivers on the node and their +// corresponding sockets +var csiDrivers csiDriversStore + +var lm labelmanager.Interface + +// RegistrationCallback is called by kubelet's plugin watcher upon detection +// of a new registration socket opened by CSI Driver registrar side car. +func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (error, chan bool) { + + glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s", + pluginName, endpoint, strings.Join(versions, ","), socketPath)) + + if endpoint == "" { + endpoint = socketPath + } + // Calling nodeLabelManager to update label for newly registered CSI driver + err := lm.AddLabels(pluginName) + if err != nil { + return err, nil + } + // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key + // all other CSI components will be able to get the actual socket of CSI drivers by its name. + csiDrivers.Lock() + defer csiDrivers.Unlock() + csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint} + + return nil, nil +} + func (p *csiPlugin) Init(host volume.VolumeHost) error { glog.Info(log("plugin initializing...")) p.host = host + + // Initializing csiDrivers map and label management channels + csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} + lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient()) + return nil }