diff --git a/hack/.golint_failures b/hack/.golint_failures index 8c9725d4132..48de3c403b8 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -382,7 +382,9 @@ pkg/volume/azure_dd pkg/volume/azure_file pkg/volume/cephfs pkg/volume/configmap +pkg/volume/csi pkg/volume/csi/fake +pkg/volume/csi/labelmanager pkg/volume/empty_dir pkg/volume/fc pkg/volume/flexvolume diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index ef4c3a5dafe..305cfd970ec 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -102,6 +102,7 @@ go_library( "//pkg/util/removeall:go_default_library", "//pkg/version:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/csi:go_default_library", "//pkg/volume/util:go_default_library", "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4b94359117e..0484cfdfc3c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -106,6 +106,7 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" utilexec "k8s.io/utils/exec" ) @@ -1290,6 +1291,9 @@ func (kl *Kubelet) initializeModules() error { } } if kl.enablePluginsWatcher { + // Adding Registration Callback function for CSI Driver + kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback) + // Start the plugin watcher if err := kl.pluginWatcher.Start(); err != nil { return fmt.Errorf("failed to start Plugin Watcher. err: %v", err) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 63e72468708..c8e7efc1f9d 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -12,9 +12,11 @@ go_library( importpath = "k8s.io/kubernetes/pkg/volume/csi", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/csi/labelmanager:go_default_library", "//pkg/volume/util:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library", "//vendor/github.com/golang/glog:go_default_library", @@ -25,6 +27,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) @@ -70,6 +73,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/volume/csi/fake:all-srcs", + "//pkg/volume/csi/labelmanager:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 6398ce455d1..c6d11abb8cb 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -27,6 +27,8 @@ import ( "github.com/golang/glog" "google.golang.org/grpc" api "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) type csiClient interface { @@ -255,9 +257,16 @@ func newGrpcConn(driverName string) (*grpc.ClientConn, error) { if driverName == "" { return nil, fmt.Errorf("driver name is empty") } - - network := "unix" addr := fmt.Sprintf(csiAddrTemplate, driverName) + // TODO once KubeletPluginsWatcher graduates to beta, remove FeatureGate check + if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) { + driver, ok := csiDrivers.driversMap[driverName] + if !ok { + return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName) + } + addr = driver.driverEndpoint + } + network := "unix" glog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr)) return grpc.Dial( diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 8a993afb920..48b2a9835ad 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -21,6 +21,8 @@ import ( "fmt" "os" "path" + "strings" + "sync" "time" "github.com/golang/glog" @@ -29,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi/labelmanager" ) const ( @@ -59,9 +62,54 @@ func ProbeVolumePlugins() []volume.VolumePlugin { // volume.VolumePlugin methods var _ volume.VolumePlugin = &csiPlugin{} +type csiDriver struct { + driverName string + driverEndpoint string +} + +type csiDriversStore struct { + driversMap map[string]csiDriver + sync.RWMutex +} + +// csiDrivers map keep track of all registered CSI drivers on the node and their +// corresponding sockets +var csiDrivers csiDriversStore + +var lm labelmanager.Interface + +// RegistrationCallback is called by kubelet's plugin watcher upon detection +// of a new registration socket opened by CSI Driver registrar side car. +func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (error, chan bool) { + + glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s", + pluginName, endpoint, strings.Join(versions, ","), socketPath)) + + if endpoint == "" { + endpoint = socketPath + } + // Calling nodeLabelManager to update label for newly registered CSI driver + err := lm.AddLabels(pluginName) + if err != nil { + return err, nil + } + // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key + // all other CSI components will be able to get the actual socket of CSI drivers by its name. + csiDrivers.Lock() + defer csiDrivers.Unlock() + csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint} + + return nil, nil +} + func (p *csiPlugin) Init(host volume.VolumeHost) error { glog.Info(log("plugin initializing...")) p.host = host + + // Initializing csiDrivers map and label management channels + csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} + lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient()) + return nil } diff --git a/pkg/volume/csi/labelmanager/BUILD b/pkg/volume/csi/labelmanager/BUILD new file mode 100644 index 00000000000..1fed8335642 --- /dev/null +++ b/pkg/volume/csi/labelmanager/BUILD @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["labelmanager.go"], + importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager", + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/util/retry:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/volume/csi/labelmanager/labelmanager.go b/pkg/volume/csi/labelmanager/labelmanager.go new file mode 100644 index 00000000000..162d7d3dc84 --- /dev/null +++ b/pkg/volume/csi/labelmanager/labelmanager.go @@ -0,0 +1,251 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package labelmanager includes internal functions used to add/delete labels to +// kubernetes nodes for corresponding CSI drivers +package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager" + +import ( + "encoding/json" + "fmt" + + "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/retry" +) + +const ( + // Name of node annotation that contains JSON map of driver names to node + // names + annotationKey = "csi.volume.kubernetes.io/nodeid" + csiPluginName = "kubernetes.io/csi" +) + +// labelManagementStruct is struct of channels used for communication between the driver registration +// code and the go routine responsible for managing the node's labels +type labelManagerStruct struct { + nodeName types.NodeName + k8s kubernetes.Interface +} + +// Interface implements an interface for managing labels of a node +type Interface interface { + AddLabels(driverName string) error +} + +// NewLabelManager initializes labelManagerStruct and returns available interfaces +func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface { + return labelManagerStruct{ + nodeName: nodeName, + k8s: kubeClient, + } +} + +// nodeLabelManager waits for labeling requests initiated by the driver's registration +// process. +func (lm labelManagerStruct) AddLabels(driverName string) error { + err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, string(lm.nodeName)) + if err != nil { + return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err) + } + return nil +} + +// Clones the given map and returns a new map with the given key and value added. +// Returns the given map, if annotationKey is empty. +func cloneAndAddAnnotation( + annotations map[string]string, + annotationKey, + annotationValue string) map[string]string { + if annotationKey == "" { + // Don't need to add an annotation. + return annotations + } + // Clone. + newAnnotations := map[string]string{} + for key, value := range annotations { + newAnnotations[key] = value + } + newAnnotations[annotationKey] = annotationValue + return newAnnotations +} + +func verifyAndAddNodeId( + k8sNodeName string, + k8sNodesClient corev1.NodeInterface, + csiDriverName string, + csiDriverNodeId string) error { + // Add or update annotation on Node object + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. RetryOnConflict uses + // exponential backoff to avoid exhausting the apiserver. + result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) + if getErr != nil { + glog.Errorf("Failed to get latest version of Node: %v", getErr) + return getErr // do not wrap error + } + + var previousAnnotationValue string + if result.ObjectMeta.Annotations != nil { + previousAnnotationValue = + result.ObjectMeta.Annotations[annotationKey] + glog.V(3).Infof( + "previousAnnotationValue=%q", previousAnnotationValue) + } + + existingDriverMap := map[string]string{} + if previousAnnotationValue != "" { + // Parse previousAnnotationValue as JSON + if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { + return fmt.Errorf( + "failed to parse node's %q annotation value (%q) err=%v", + annotationKey, + previousAnnotationValue, + err) + } + } + + if val, ok := existingDriverMap[csiDriverName]; ok { + if val == csiDriverNodeId { + // Value already exists in node annotation, nothing more to do + glog.V(1).Infof( + "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", + csiDriverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + return nil + } + } + + // Add/update annotation value + existingDriverMap[csiDriverName] = csiDriverNodeId + jsonObj, err := json.Marshal(existingDriverMap) + if err != nil { + return fmt.Errorf( + "failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", + csiDriverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + } + + result.ObjectMeta.Annotations = cloneAndAddAnnotation( + result.ObjectMeta.Annotations, + annotationKey, + string(jsonObj)) + _, updateErr := k8sNodesClient.Update(result) + if updateErr == nil { + fmt.Printf( + "Updated node %q successfully for CSI driver %q and CSI node name %q", + k8sNodeName, + csiDriverName, + csiDriverNodeId) + } + return updateErr // do not wrap error + }) + if retryErr != nil { + return fmt.Errorf("node update failed: %v", retryErr) + } + return nil +} + +// Fetches Kubernetes node API object corresponding to k8sNodeName. +// If the csiDriverName is present in the node annotation, it is removed. +func verifyAndDeleteNodeId( + k8sNodeName string, + k8sNodesClient corev1.NodeInterface, + csiDriverName string) error { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. RetryOnConflict uses + // exponential backoff to avoid exhausting the apiserver. + result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) + if getErr != nil { + glog.Errorf("failed to get latest version of Node: %v", getErr) + return getErr // do not wrap error + } + + var previousAnnotationValue string + if result.ObjectMeta.Annotations != nil { + previousAnnotationValue = + result.ObjectMeta.Annotations[annotationKey] + glog.V(3).Infof( + "previousAnnotationValue=%q", previousAnnotationValue) + } + + existingDriverMap := map[string]string{} + if previousAnnotationValue == "" { + // Value already exists in node annotation, nothing more to do + glog.V(1).Infof( + "The key %q does not exist in node %q annotation, no need to cleanup.", + csiDriverName, + annotationKey) + return nil + } + + // Parse previousAnnotationValue as JSON + if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { + return fmt.Errorf( + "failed to parse node's %q annotation value (%q) err=%v", + annotationKey, + previousAnnotationValue, + err) + } + + if _, ok := existingDriverMap[csiDriverName]; !ok { + // Value already exists in node annotation, nothing more to do + glog.V(1).Infof( + "The key %q does not eixst in node %q annotation, no need to cleanup: %v", + csiDriverName, + annotationKey, + previousAnnotationValue) + return nil + } + + // Add/update annotation value + delete(existingDriverMap, csiDriverName) + jsonObj, err := json.Marshal(existingDriverMap) + if err != nil { + return fmt.Errorf( + "failed while trying to remove key %q from node %q annotation. Existing data: %v", + csiDriverName, + annotationKey, + previousAnnotationValue) + } + + result.ObjectMeta.Annotations = cloneAndAddAnnotation( + result.ObjectMeta.Annotations, + annotationKey, + string(jsonObj)) + _, updateErr := k8sNodesClient.Update(result) + if updateErr == nil { + fmt.Printf( + "Updated node %q annotation to remove CSI driver %q.", + k8sNodeName, + csiDriverName) + } + return updateErr // do not wrap error + }) + if retryErr != nil { + return fmt.Errorf("node update failed: %v", retryErr) + } + return nil +}