diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 56bc8f57d0f..7fbb43e3716 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -125,6 +125,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/custom_metrics:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/external_metrics:go_default_library", diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 0e05f02845b..c7a93e0e3b1 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -36,6 +36,7 @@ import ( cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/controller" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/garbagecollector" @@ -192,9 +193,14 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { return nil, true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.") } + csiClientConfig := ctx.ClientBuilder.ConfigOrDie("attachdetach-controller") + // csiClient works with CRDs that support json only + csiClientConfig.ContentType = "application/json" + attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( ctx.ClientBuilder.ClientOrDie("attachdetach-controller"), + csiclientset.NewForConfigOrDie(csiClientConfig), ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), diff --git a/cmd/kubelet/app/BUILD b/cmd/kubelet/app/BUILD index 1f4fe45ee24..5f5ab8dd246 100644 --- a/cmd/kubelet/app/BUILD +++ b/cmd/kubelet/app/BUILD @@ -121,6 +121,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", "//staging/src/k8s.io/client-go/util/certificate:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/coreos/go-systemd/daemon:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 945957d9073..f0cf3ac7cb5 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -55,6 +55,7 @@ import ( "k8s.io/client-go/tools/record" certutil "k8s.io/client-go/util/cert" "k8s.io/client-go/util/certificate" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" @@ -387,6 +388,7 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err DockerClientConfig: dockerClientConfig, KubeClient: nil, HeartbeatClient: nil, + CSIClient: nil, EventClient: nil, Mounter: mounter, OOMAdjuster: oom.NewOOMAdjuster(), @@ -607,6 +609,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan glog.Warningf("Failed to create API Server client for heartbeat: %v", err) } + // csiClient works with CRDs that support json only + clientConfig.ContentType = "application/json" + csiClient, err := csiclientset.NewForConfig(clientConfig) + if err != nil { + glog.Warningf("Failed to create CSI API client: %v", err) + } + kubeDeps.KubeClient = kubeClient if heartbeatClient != nil { kubeDeps.HeartbeatClient = heartbeatClient @@ -615,6 +624,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan if eventClient != nil { kubeDeps.EventClient = eventClient } + kubeDeps.CSIClient = csiClient } // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index 6344b14e8cd..b3e919746bf 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -39,6 +39,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 537d7b3be7e..774bcc6fb9f 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -39,6 +39,7 @@ import ( kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -96,6 +97,7 @@ type AttachDetachController interface { // NewAttachDetachController returns a new instance of AttachDetachController. func NewAttachDetachController( kubeClient clientset.Interface, + csiClient csiclientset.Interface, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, @@ -122,6 +124,7 @@ func NewAttachDetachController( // deleted (probably can't do this with sharedInformer), etc. adc := &attachDetachController{ kubeClient: kubeClient, + csiClient: csiClient, pvcLister: pvcInformer.Lister(), pvcsSynced: pvcInformer.Informer().HasSynced, pvLister: pvInformer.Lister(), @@ -237,6 +240,10 @@ type attachDetachController struct { // the API server. kubeClient clientset.Interface + // csiClient is the csi.storage.k8s.io API client used by volumehost to communicate with + // the API server. + csiClient csiclientset.Interface + // pvcLister is the shared PVC lister used to fetch and store PVC // objects from the API server. It is shared with other controllers and // therefore the PVC objects in its store should be treated as immutable. @@ -751,3 +758,7 @@ func (adc *attachDetachController) GetNodeName() types.NodeName { func (adc *attachDetachController) GetEventRecorder() record.EventRecorder { return adc.recorder } + +func (adc *attachDetachController) GetCSIClient() csiclientset.Interface { + return adc.csiClient +} diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 283a910f24c..84cdd95c8c9 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -40,6 +40,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { // Act _, err := NewAttachDetachController( fakeKubeClient, + nil, /* csiClient */ informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumeClaims(), @@ -214,6 +215,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 // Create the controller adcObj, err := NewAttachDetachController( fakeKubeClient, + nil, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumeClaims(), diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 7f7e73c2103..c1047396131 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -38,6 +38,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 550cfbd2c16..b7609baeb2e 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -37,6 +37,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/events" @@ -324,3 +325,8 @@ func (expc *expandController) GetNodeName() types.NodeName { func (expc *expandController) GetEventRecorder() record.EventRecorder { return expc.recorder } + +func (expc *expandController) GetCSIClient() csiclientset.Interface { + // No volume plugin in expand controller needs csi.storage.k8s.io + return nil +} diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index dd80246dcd3..6936a128ef0 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -56,6 +56,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/reference:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index 74adf6d7a7d..fe54b06c624 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/mount" vol "k8s.io/kubernetes/pkg/volume" @@ -123,3 +124,8 @@ func (ctrl *PersistentVolumeController) GetNodeName() types.NodeName { func (ctrl *PersistentVolumeController) GetEventRecorder() record.EventRecorder { return ctrl.eventRecorder } + +func (ctrl *PersistentVolumeController) GetCSIClient() csiclientset.Interface { + // No volume plugin needs csi.storage.k8s.io client in PV controller. + return nil +} diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 96e3f08ee96..d1c0b61beed 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -135,6 +135,7 @@ go_library( "//staging/src/k8s.io/client-go/util/certificate:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/integer:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//third_party/forked/golang/expansion:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d7f111cbea8..c8a58100e79 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -53,6 +53,7 @@ import ( "k8s.io/client-go/util/certificate" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/integer" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" @@ -243,6 +244,7 @@ type Dependencies struct { HeartbeatClient clientset.Interface OnHeartbeatFailure func() KubeClient clientset.Interface + CSIClient csiclientset.Interface Mounter mount.Interface OOMAdjuster *oom.OOMAdjuster OSInterface kubecontainer.OSInterface @@ -484,6 +486,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, hostnameOverridden: len(hostnameOverride) > 0, nodeName: nodeName, kubeClient: kubeDeps.KubeClient, + csiClient: kubeDeps.CSIClient, heartbeatClient: kubeDeps.HeartbeatClient, onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure, rootDirectory: rootDirectory, @@ -876,6 +879,7 @@ type Kubelet struct { nodeName types.NodeName runtimeCache kubecontainer.RuntimeCache kubeClient clientset.Interface + csiClient csiclientset.Interface heartbeatClient clientset.Interface iptClient utilipt.Interface rootDirectory string diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 0b01f4c7e91..fda1f356c90 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -29,6 +29,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/configmap" @@ -121,6 +122,10 @@ func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface { return kvh.kubelet.kubeClient } +func (kvh *kubeletVolumeHost) GetCSIClient() csiclientset.Interface { + return kvh.kubelet.csiClient +} + func (kvh *kubeletVolumeHost) NewWrapperMounter( volName string, spec volume.Spec, diff --git a/pkg/volume/BUILD b/pkg/volume/BUILD index 778cffc8539..bbe76dd8dcd 100644 --- a/pkg/volume/BUILD +++ b/pkg/volume/BUILD @@ -30,6 +30,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index e5f55a15d8a..c8968b6c9c7 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -23,9 +23,12 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index cc9e3c97901..96bef31397e 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -26,11 +26,15 @@ import ( "time" "context" + "github.com/golang/glog" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions" + csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/labelmanager" @@ -48,11 +52,15 @@ const ( volNameSep = "^" volDataFileName = "vol_data.json" fsTypeBlockName = "block" + + // TODO: increase to something useful + csiResyncPeriod = time.Minute ) type csiPlugin struct { - host volume.VolumeHost - blockEnabled bool + host volume.VolumeHost + blockEnabled bool + csiDriverInformer csiinformer.CSIDriverInformer } // ProbeVolumePlugins returns implemented plugins @@ -132,6 +140,14 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient()) + csiClient := host.GetCSIClient() + if csiClient != nil { + // Start informer for CSIDrivers. + factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod) + p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers() + go factory.Start(wait.NeverStop) + } + return nil } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 8ea4584f996..e9ccf7e2967 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume/util/recyclerclient" @@ -309,6 +310,9 @@ type VolumeHost interface { // GetKubeClient returns a client interface GetKubeClient() clientset.Interface + // GetCSIClient returns a client interface to csi.storage.k8s.io + GetCSIClient() csiclientset.Interface + // NewWrapperMounter finds an appropriate plugin with which to handle // the provided spec. This is used to implement volume plugins which // "wrap" other plugins. For example, the "secret" volume is diff --git a/pkg/volume/testing/BUILD b/pkg/volume/testing/BUILD index 5cbf2e57da7..e81a029c58f 100644 --- a/pkg/volume/testing/BUILD +++ b/pkg/volume/testing/BUILD @@ -29,6 +29,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/stretchr/testify/mock:go_default_library", ], ) diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index cacbcb3ca3c..94565eaaf73 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -36,6 +36,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" utiltesting "k8s.io/client-go/util/testing" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/mount" utilstrings "k8s.io/kubernetes/pkg/util/strings" @@ -49,6 +50,7 @@ import ( type fakeVolumeHost struct { rootDir string kubeClient clientset.Interface + csiClient csiclientset.Interface pluginMgr VolumePluginMgr cloud cloudprovider.Interface mounter mount.Interface @@ -113,6 +115,10 @@ func (f *fakeVolumeHost) GetKubeClient() clientset.Interface { return f.kubeClient } +func (f *fakeVolumeHost) GetCSIClient() csiclientset.Interface { + return f.csiClient +} + func (f *fakeVolumeHost) GetCloudProvider() cloudprovider.Interface { return f.cloud } diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 5b12d706fc6..aa72838c058 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -412,6 +412,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy informers := informers.NewSharedInformerFactory(testClient, resyncPeriod) ctrl, err := attachdetach.NewAttachDetachController( testClient, + nil, /* csiClient */ informers.Core().V1().Pods(), informers.Core().V1().Nodes(), informers.Core().V1().PersistentVolumeClaims(),