diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0f5c46b5d77..1a2bf5ca1c7 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -384,6 +384,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig provisioner, ProbeRecyclableVolumePlugins(s.VolumeConfiguration), cloud, + s.ClusterName, ) volumeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 46d9c48ea9e..36ee2e6546d 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -282,6 +282,7 @@ func (s *CMServer) Run(_ []string) error { provisioner, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfiguration), cloud, + s.ClusterName, ) volumeController.Run() diff --git a/pkg/controller/persistentvolume/persistentvolume_controller.go b/pkg/controller/persistentvolume/persistentvolume_controller.go index 9e3f15ddcff..6482597ee14 100644 --- a/pkg/controller/persistentvolume/persistentvolume_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_controller.go @@ -80,6 +80,30 @@ const annBoundByController = "pv.kubernetes.io/bound-by-controller" // Value of this annotation should be empty. const annClass = "volume.alpha.kubernetes.io/storage-class" +// This annotation is added to a PV that has been dynamically provisioned by +// Kubernetes. It's value is name of volume plugin that created the volume. +// It serves both user (to show where a PV comes from) and Kubernetes (to +// recognize dynamically provisioned PVs in its decissions). +const annDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by" + +// Name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD) +// with namespace of a persistent volume claim used to create this volume. +const cloudVolumeCreatedForClaimNamespaceTag = "kubernetes.io/created-for/pvc/namespace" + +// Name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD) +// with name of a persistent volume claim used to create this volume. +const cloudVolumeCreatedForClaimNameTag = "kubernetes.io/created-for/pvc/name" + +// Name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD) +// with name of appropriate Kubernetes persistent volume . +const cloudVolumeCreatedForVolumeNameTag = "kubernetes.io/created-for/pv/name" + +// Number of retries when we create a PV object for a provisioned volume. +const createProvisionedPVRetryCount = 5 + +// Interval between retries when we create a PV object for a provisioned volume. +const createProvisionedPVInterval = 10 * time.Second + // PersistentVolumeController is a controller that synchronizes // PersistentVolumeClaims and PersistentVolumes. It starts two // framework.Controllers that watch PerstentVolume and PersistentVolumeClaim @@ -95,6 +119,8 @@ type PersistentVolumeController struct { eventRecorder record.EventRecorder cloud cloudprovider.Interface recyclePluginMgr vol.VolumePluginMgr + provisioner vol.ProvisionableVolumePlugin + clusterName string // PersistentVolumeController keeps track of long running operations and // makes sure it won't start the same operation twice in parallel. @@ -112,6 +138,9 @@ type PersistentVolumeController struct { // For testing only: hook to call before an asynchronous operation starts. // Not used when set to nil. preOperationHook func(operationName string, operationArgument interface{}) + + createProvisionedPVRetryCount int + createProvisionedPVInterval time.Duration } // NewPersistentVolumeController creates a new PersistentVolumeController @@ -120,19 +149,29 @@ func NewPersistentVolumeController( syncPeriod time.Duration, provisioner vol.ProvisionableVolumePlugin, recyclers []vol.VolumePlugin, - cloud cloudprovider.Interface) *PersistentVolumeController { + cloud cloudprovider.Interface, + clusterName string) *PersistentVolumeController { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"}) controller := &PersistentVolumeController{ - kubeClient: kubeClient, - eventRecorder: recorder, - runningOperations: make(map[string]bool), - cloud: cloud, + kubeClient: kubeClient, + eventRecorder: recorder, + runningOperations: make(map[string]bool), + cloud: cloud, + provisioner: provisioner, + clusterName: clusterName, + createProvisionedPVRetryCount: createProvisionedPVRetryCount, + createProvisionedPVInterval: createProvisionedPVInterval, } controller.recyclePluginMgr.InitPlugins(recyclers, controller) + if controller.provisioner != nil { + if err := controller.provisioner.Init(controller); err != nil { + glog.Errorf("PersistentVolumeController: error initializing provisioner plugin: %v", err) + } + } volumeSource := &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -376,25 +415,10 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *api.PersistentVo // No PV could be found // OBSERVATION: pvc is "Pending", will retry if hasAnnotation(claim.ObjectMeta, annClass) { - // TODO: provisioning - //plugin := findProvisionerPluginForPV(pv) // Need to flesh this out - //if plugin != nil { - //FIXME: left off here - // No match was found and provisioning was requested. - // - // maintain a map with the current provisioner goroutines that are running - // if the key is already present in the map, return - // - // launch the goroutine that: - // 1. calls plugin.Provision to make the storage asset - // 2. gets back a PV object (partially filled) - // 3. create the PV API object, with claimRef -> pvc - // 4. deletes itself from the map when it's done - // return - //} else { - // make an event calling out that no provisioner was configured - // return, try later? - //} + if err = ctrl.provisionClaim(claim); err != nil { + return err + } + return nil } // Mark the claim as Pending and try to find a match in the next // periodic syncClaim @@ -1036,7 +1060,6 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) glog.Errorf("Cannot convert recycleVolumeOperation argument to volume, got %+v", arg) return } - glog.V(4).Infof("recycleVolumeOperation [%s] started", volume.Name) // This method may have been waiting for a volume lock for some time. @@ -1241,6 +1264,149 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *api.PersistentVol return nil } +// provisionClaim starts new asynchronous operation to provision a claim. +func (ctrl *PersistentVolumeController) provisionClaim(claim *api.PersistentVolumeClaim) error { + glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim)) + ctrl.scheduleOperation("provision-"+string(claim.UID), ctrl.provisionClaimOperation, claim) + return nil +} + +// provisionClaimOperation provisions a volume. This method is running in +// standalone goroutine and already has all necessary locks. +func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interface{}) { + claim, ok := claimObj.(*api.PersistentVolumeClaim) + if !ok { + glog.Errorf("Cannot convert provisionClaimOperation argument to claim, got %+v", claimObj) + return + } + glog.V(4).Infof("provisionClaimOperation [%s] started", claimToClaimKey(claim)) + + // A previous doProvisionClaim may just have finished while we were waiting for + // the locks. Check that PV (with deterministic name) hasn't been provisioned + // yet. + + pvName := ctrl.getProvisionedVolumeNameForClaim(claim) + volume, err := ctrl.kubeClient.Core().PersistentVolumes().Get(pvName) + if err == nil && volume != nil { + // Volume has been already provisioned, nothing to do. + glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim)) + return + } + + // Prepare a claimRef to the claim early (to fail before a volume is + // provisioned) + claimRef, err := api.GetReference(claim) + if err != nil { + glog.V(3).Infof("unexpected error getting claim reference: %v", err) + return + } + + // TODO: find provisionable plugin based on a class/profile + plugin := ctrl.provisioner + if plugin == nil { + // No provisioner found. Emit an event. + ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", "No provisioner plugin found for the claim!") + glog.V(2).Infof("no provisioner plugin found for claim %s!", claimToClaimKey(claim)) + // The controller will retry provisioning the volume in every + // syncVolume() call. + return + } + + // Gather provisioning options + tags := make(map[string]string) + tags[cloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace + tags[cloudVolumeCreatedForClaimNameTag] = claim.Name + tags[cloudVolumeCreatedForVolumeNameTag] = pvName + + options := vol.VolumeOptions{ + Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)], + AccessModes: claim.Spec.AccessModes, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + CloudTags: &tags, + ClusterName: ctrl.clusterName, + PVName: pvName, + } + + // Provision the volume + provisioner, err := plugin.NewProvisioner(options) + if err != nil { + strerr := fmt.Sprintf("Failed to create provisioner: %v", err) + glog.V(2).Infof("failed to create provisioner for claim %q: %v", claimToClaimKey(claim), err) + ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", strerr) + return + } + + volume, err = provisioner.Provision() + if err != nil { + strerr := fmt.Sprintf("Failed to provision volume: %v", err) + glog.V(2).Infof("failed to provision volume for claim %q: %v", claimToClaimKey(claim), err) + ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", strerr) + return + } + + glog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim)) + + // Create Kubernetes PV object for the volume. + volume.Name = pvName + // Bind it to the claim + volume.Spec.ClaimRef = claimRef + volume.Status.Phase = api.VolumeBound + + // Add annBoundByController (used in deleting the volume) + setAnnotation(&volume.ObjectMeta, annBoundByController, "yes") + setAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, plugin.Name()) + + // Try to create the PV object several times + for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { + glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name) + if _, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil { + // Save succeeded. + glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim)) + break + } + // Save failed, try again after a while. + glog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err) + time.Sleep(ctrl.createProvisionedPVInterval) + } + + if err != nil { + // Save failed. Now we have a storage asset outside of Kubernetes, + // but we don't have appropriate PV object for it. + // Emit some event here and try to delete the storage asset several + // times. + strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err) + glog.V(3).Info(strerr) + ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", strerr) + + for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { + if err = ctrl.doDeleteVolume(volume); err == nil { + // Delete succeeded + glog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name) + break + } + // Delete failed, try again after a while. + glog.V(3).Infof("failed to delete volume %q: %v", volume.Name, i, err) + time.Sleep(ctrl.createProvisionedPVInterval) + } + + if err != nil { + // Delete failed several times. There is orphaned volume and there + // is nothing we can do about it. + strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err) + glog.V(2).Info(strerr) + ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningCleanupFailed", strerr) + } + } else { + glog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim)) + } +} + +// getProvisionedVolumeNameForClaim returns PV.Name for the provisioned volume. +// The name must be unique +func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim *api.PersistentVolumeClaim) string { + return "pv-provisioned-for-" + string(claim.UID) +} + // scheduleOperation starts given asynchronous operation on given volume. It // makes sure the operation is already not running. func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) { diff --git a/pkg/controller/persistentvolume/persistentvolume_framework_test.go b/pkg/controller/persistentvolume/persistentvolume_framework_test.go index fda289b6b33..daca2f085c8 100644 --- a/pkg/controller/persistentvolume/persistentvolume_framework_test.go +++ b/pkg/controller/persistentvolume/persistentvolume_framework_test.go @@ -424,6 +424,10 @@ func newPersistentVolumeController(kubeClient clientset.Interface) *PersistentVo kubeClient: kubeClient, eventRecorder: record.NewFakeRecorder(1000), runningOperations: make(map[string]bool), + + // Speed up the testing + createProvisionedPVRetryCount: createProvisionedPVRetryCount, + createProvisionedPVInterval: 5 * time.Millisecond, } return ctrl }