Merge pull request #73653 from ddebroy/migprov1

Support dynamic provisioning for CSI migration scenarios
This commit is contained in:
Kubernetes Prow Robot 2019-02-28 01:52:55 -08:00 committed by GitHub
commit 36787041cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 8 deletions

View File

@ -62,6 +62,7 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -988,6 +988,22 @@ func wrapTestWithProvisionCalls(expectedProvisionCalls []provisionCall, toWrap t
return wrapTestWithPluginCalls(nil, nil, expectedProvisionCalls, toWrap)
}
// wrapTestWithCSIMigrationProvisionCalls returns a testCall that:
// - configures controller with a volume plugin that emulates CSI migration
// - calls given testCall
func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
plugin := &mockVolumePlugin{
isMigratedToCSI: true,
}
ctrl.volumePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, nil /* prober */, ctrl)
ctrl.csiNameFromIntreeNameHook = func(string) (string, error) {
return "vendor.com/MockCSIPlugin", nil
}
return toWrap(ctrl, reactor, test)
}
}
// wrapTestWithInjectedOperation returns a testCall that:
// - starts the controller and lets it run original testCall until
// scheduleOperation() call. It blocks the controller there and calls the
@ -1229,6 +1245,7 @@ type mockVolumePlugin struct {
deleteCallCounter int
recycleCalls []error
recycleCallCounter int
isMigratedToCSI bool
provisionOptions vol.VolumeOptions
}
@ -1259,7 +1276,7 @@ func (plugin *mockVolumePlugin) CanSupport(spec *vol.Spec) bool {
}
func (plugin *mockVolumePlugin) IsMigratedToCSI() bool {
return false
return plugin.isMigratedToCSI
}
func (plugin *mockVolumePlugin) RequiresRemount() bool {

View File

@ -421,6 +421,17 @@ func TestProvisionSync(t *testing.T) {
[]string{"Warning ProvisioningFailed Mount options"},
noerrors, wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim),
},
{
// No provisioning due to CSI migration + normal event with external provisioner
"11-21 - external provisioner for CSI migration",
novolumes,
novolumes,
newClaimArray("claim11-21", "uid11-21", "1Gi", "", v1.ClaimPending, &classGold),
claimWithAnnotation(annStorageProvisioner, "vendor.com/MockCSIPlugin",
newClaimArray("claim11-21", "uid11-21", "1Gi", "", v1.ClaimPending, &classGold)),
[]string{"Normal ExternalProvisioning"},
noerrors, wrapTestWithCSIMigrationProvisionCalls(testSyncClaim),
},
}
runSyncTests(t, tests, storageClasses, []*v1.Pod{})
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
csitranslation "k8s.io/csi-translation-lib"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
@ -230,6 +231,10 @@ type PersistentVolumeController struct {
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration
// For testing only: hook to intercept CSI driver name <=> Intree plugin name mapping
// Not used when set to nil
csiNameFromIntreeNameHook func(pluginName string) (string, error)
}
// syncClaim is the main controller method to decide what to do with a claim.
@ -1346,6 +1351,13 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
return nil
}
func (ctrl *PersistentVolumeController) getCSINameFromIntreeName(pluginName string) (string, error) {
if ctrl.csiNameFromIntreeNameHook != nil {
return ctrl.csiNameFromIntreeNameHook(pluginName)
}
return csitranslation.GetCSINameFromIntreeName(pluginName)
}
// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) {
@ -1362,12 +1374,26 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
}
var pluginName string
provisionerName := storageClass.Provisioner
if plugin != nil {
pluginName = plugin.GetPluginName()
if plugin.IsMigratedToCSI() {
// pluginName is not set here to align with existing behavior
// of not setting pluginName for external provisioners (including CSI)
// Set provisionerName to CSI plugin name for setClaimProvisioner
provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
if err != nil {
strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err)
klog.V(2).Infof("%s", strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return "", err
}
} else {
pluginName = plugin.GetPluginName()
}
}
// Add provisioner annotation so external provisioners know when to start
newClaim, err := ctrl.setClaimProvisioner(claim, storageClass)
newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
if err != nil {
// Save failed, the controller will retry in the next sync
klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
@ -1375,7 +1401,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
}
claim = newClaim
if plugin == nil {
if plugin == nil || plugin.IsMigratedToCSI() {
// findProvisionablePlugin returned no error nor plugin.
// This means that an unknown provisioner is requested. Report an event
// and wait for the external provisioner

View File

@ -22,7 +22,6 @@ import (
"time"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -428,8 +427,8 @@ func (ctrl *PersistentVolumeController) resync() {
// setClaimProvisioner saves
// claim.Annotations[annStorageProvisioner] = class.Provisioner
func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, class *storage.StorageClass) (*v1.PersistentVolumeClaim, error) {
if val, ok := claim.Annotations[annStorageProvisioner]; ok && val == class.Provisioner {
func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) {
if val, ok := claim.Annotations[annStorageProvisioner]; ok && val == provisionerName {
// annotation is already set, nothing to do
return claim, nil
}
@ -437,7 +436,7 @@ func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.Persistent
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annStorageProvisioner, class.Provisioner)
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annStorageProvisioner, provisionerName)
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claimClone)
if err != nil {
return newClaim, err