diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index 9949d272938..89c0de3fd9c 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -142,7 +142,11 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour return } for _, volume := range volumeList.Items { - storeObjectUpdate(ctrl.volumes.store, volume, "volume") + // Ignore template volumes from kubernetes 1.2 + deleted := ctrl.upgradeVolumeFrom1_2(volume.(*api.PersistentVolume)) + if !deleted { + storeObjectUpdate(ctrl.volumes.store, volume, "volume") + } } claimListObj, err := claimSource.List(api.ListOptions{}) @@ -164,6 +168,17 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour // addVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { + pv, ok := obj.(*api.PersistentVolume) + if !ok { + glog.Errorf("expected PersistentVolume but handler received %+v", obj) + return + } + + if ctrl.upgradeVolumeFrom1_2(pv) { + // volume deleted + return + } + // Store the new volume version in the cache and do not process it if this // is an old version. new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume") @@ -174,11 +189,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { return } - pv, ok := obj.(*api.PersistentVolume) - if !ok { - glog.Errorf("expected PersistentVolume but handler received %+v", obj) - return - } if err := ctrl.syncVolume(pv); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller @@ -193,6 +203,17 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { // updateVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) { + newVolume, ok := newObj.(*api.PersistentVolume) + if !ok { + glog.Errorf("Expected PersistentVolume but handler received %+v", newObj) + return + } + + if ctrl.upgradeVolumeFrom1_2(newVolume) { + // volume deleted + return + } + // Store the new volume version in the cache and do not process it if this // is an old version. new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume") @@ -203,11 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) return } - newVolume, ok := newObj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but handler received %+v", newObj) - return - } if err := ctrl.syncVolume(newVolume); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller @@ -400,6 +416,44 @@ func (ctrl *PersistentVolumeController) Stop() { close(ctrl.claimControllerStopCh) } +const ( + // these pair of constants are used by the provisioner in Kubernetes 1.2. + pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required" + pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed" +) + +// upgradeVolumeFrom1_2 updates PV from Kubernetes 1.2 to 1.3 and newer. In 1.2, +// we used template PersistentVolume instances for dynamic provisioning. In 1.3 +// and later, these template (and not provisioned) instances must be removed to +// make the controller to provision a new PV. +// It returns true if the volume was deleted. +// TODO: remove this function when upgrade from 1.2 becomes unsupported. +func (ctrl *PersistentVolumeController) upgradeVolumeFrom1_2(volume *api.PersistentVolume) bool { + annValue, found := volume.Annotations[pvProvisioningRequiredAnnotationKey] + if !found { + // The volume is not template + return false + } + if annValue == pvProvisioningCompletedAnnotationValue { + // The volume is already fully provisioned. The new controller will + // ignore this annotation and it will obey its ReclaimPolicy, which is + // likely to delete the volume when appropriate claim is deleted. + return false + } + glog.V(2).Infof("deleting unprovisioned template volume %q from Kubernetes 1.2.", volume.Name) + err := ctrl.kubeClient.Core().PersistentVolumes().Delete(volume.Name, nil) + if err != nil { + glog.Errorf("cannot delete unprovisioned template volume %q: %v", volume.Name, err) + } + // Remove from local cache + err = ctrl.volumes.store.Delete(volume) + if err != nil { + glog.Errorf("cannot remove volume %q from local cache: %v", volume.Name, err) + } + + return true +} + // Stateless functions func hasAnnotation(obj api.ObjectMeta, ann string) bool { diff --git a/pkg/controller/persistentvolume/controller_test.go b/pkg/controller/persistentvolume/controller_test.go index a2c5a6f99e9..121998e394f 100644 --- a/pkg/controller/persistentvolume/controller_test.go +++ b/pkg/controller/persistentvolume/controller_test.go @@ -33,7 +33,7 @@ import ( // can't reliably simulate periodic sync of volumes/claims - it would be // either very timing-sensitive or slow to wait for real periodic sync. func TestControllerSync(t *testing.T) { - expectedChanges := []int{4, 1, 1} + expectedChanges := []int{4, 1, 1, 2, 1, 1, 1} tests := []controllerTest{ // [Unit test set 5] - controller tests. // We test the controller as if @@ -87,6 +87,74 @@ func TestControllerSync(t *testing.T) { return nil }, }, + { + // addVolume with provisioned volume from Kubernetes 1.2. No "action" + // is expected - it should stay bound. + "5-5 - add bound volume from 1.2", + novolumes, + []*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-5", "10Gi", "uid5-5", "claim5-5", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)}, + newClaimArray("claim5-5", "uid5-5", "1Gi", "", api.ClaimPending), + newClaimArray("claim5-5", "uid5-5", "1Gi", "volume5-5", api.ClaimBound, annBindCompleted, annBoundByController), + noevents, noerrors, + // Custom test function that generates a add event + func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + volume := newVolume("volume5-5", "10Gi", "uid5-5", "claim5-5", api.VolumeBound, api.PersistentVolumeReclaimDelete) + volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue) + reactor.addVolumeEvent(volume) + return nil + }, + }, + { + // updateVolume with provisioned volume from Kubernetes 1.2. No + // "action" is expected - it should stay bound. + "5-6 - update bound volume from 1.2", + []*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)}, + []*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)}, + newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", api.ClaimBound), + newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", api.ClaimBound, annBindCompleted), + noevents, noerrors, + // Custom test function that generates a add event + func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + volume := newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete) + volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue) + reactor.modifyVolumeEvent(volume) + return nil + }, + }, + { + // addVolume with unprovisioned volume from Kubernetes 1.2. The + // volume should be deleted. + "5-7 - add unprovisioned volume from 1.2", + novolumes, + novolumes, + newClaimArray("claim5-7", "uid5-7", "1Gi", "", api.ClaimPending), + newClaimArray("claim5-7", "uid5-7", "1Gi", "", api.ClaimPending), + noevents, noerrors, + // Custom test function that generates a add event + func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + volume := newVolume("volume5-7", "10Gi", "uid5-7", "claim5-7", api.VolumeBound, api.PersistentVolumeReclaimDelete) + volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, "yes") + reactor.addVolumeEvent(volume) + return nil + }, + }, + { + // updateVolume with unprovisioned volume from Kubernetes 1.2. The + // volume should be deleted. + "5-8 - update bound volume from 1.2", + novolumes, + novolumes, + newClaimArray("claim5-8", "uid5-8", "1Gi", "", api.ClaimPending), + newClaimArray("claim5-8", "uid5-8", "1Gi", "", api.ClaimPending), + noevents, noerrors, + // Custom test function that generates a add event + func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + volume := newVolume("volume5-8", "10Gi", "uid5-8", "claim5-8", api.VolumeBound, api.PersistentVolumeReclaimDelete) + volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, "yes") + reactor.modifyVolumeEvent(volume) + return nil + }, + }, } for ix, test := range tests { @@ -108,7 +176,7 @@ func TestControllerSync(t *testing.T) { } // Start the controller - defer ctrl.Stop() + count := reactor.getChangeCount() go ctrl.Run() // Wait for the controller to pass initial sync and fill its caches. @@ -121,8 +189,6 @@ func TestControllerSync(t *testing.T) { } glog.V(4).Infof("controller synced, starting test") - count := reactor.getChangeCount() - // Call the tested function err := test.test(ctrl, reactor, test) if err != nil { @@ -133,10 +199,15 @@ func TestControllerSync(t *testing.T) { ctrl.claims.Resync() ctrl.volumes.store.Resync() + // Wait at least once, just in case expectedChanges[ix] == 0 + reactor.waitTest() + // Wait for expected number of operations. for reactor.getChangeCount() < count+expectedChanges[ix] { reactor.waitTest() } + ctrl.Stop() + evaluateTestResults(ctrl, reactor, test, t) } } @@ -208,3 +279,11 @@ func TestControllerCacheParsingError(t *testing.T) { t.Errorf("Expected parsing error, got nil instead") } } + +func addVolumeAnnotation(volume *api.PersistentVolume, annName, annValue string) *api.PersistentVolume { + if volume.Annotations == nil { + volume.Annotations = make(map[string]string) + } + volume.Annotations[annName] = annValue + return volume +} diff --git a/pkg/controller/persistentvolume/framework_test.go b/pkg/controller/persistentvolume/framework_test.go index 27c1cda2388..821a108b1b2 100644 --- a/pkg/controller/persistentvolume/framework_test.go +++ b/pkg/controller/persistentvolume/framework_test.go @@ -253,6 +253,7 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj _, found := r.volumes[name] if found { delete(r.volumes, name) + r.changedSinceLastSync++ return true, nil, nil } else { return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name) @@ -264,6 +265,7 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj _, found := r.volumes[name] if found { delete(r.claims, name) + r.changedSinceLastSync++ return true, nil, nil } else { return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name) @@ -519,6 +521,20 @@ func (r *volumeReactor) addVolumeEvent(volume *api.PersistentVolume) { r.volumeSource.Add(volume) } +// modifyVolumeEvent simulates that a volume has been modified in etcd and the +// controller receives 'volume modified' event. +func (r *volumeReactor) modifyVolumeEvent(volume *api.PersistentVolume) { + r.lock.Lock() + defer r.lock.Unlock() + + r.volumes[volume.Name] = volume + // Generate deletion event. Cloned volume is needed to prevent races (and we + // would get a clone from etcd too). + clone, _ := conversion.NewCloner().DeepCopy(volume) + volumeClone := clone.(*api.PersistentVolume) + r.volumeSource.Modify(volumeClone) +} + // addClaimEvent simulates that a claim has been deleted in etcd and the // controller receives 'claim added' event. func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) {