mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #25865 from jsafrane/devel/pv-convert-from-12
Automatic merge from submit-queue volume controller: Convert PersistentVolumes from Kubernetes 1.2 In Kubernetes 1.2 we used template PersistentVolume for provisioning. When a claim for dynamic volume was detected, Kubernetes did: - create template PV for the claim with dummy pointer to storage asset - allocate storage asset such as AWS EBS - fill real pointer to the created storage asset to the template PV In refactored volume provisioner, Kubernetes allocates the storage asset first and then creates a Kubernetes PV instance already with the correct pointer to the storage asset. To support seamles upgrade from 1.2 to 1.3 we need to remove these unprovisioned template PVs. The new controller does not use them, it will see PVC for dynamic provisioning and create real PV instead. See https://github.com/pmorie/pv-haxxz/pull/3 for pseudocode.
This commit is contained in:
commit
bd2bc25308
@ -142,7 +142,11 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
|
||||
return
|
||||
}
|
||||
for _, volume := range volumeList.Items {
|
||||
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
||||
// Ignore template volumes from kubernetes 1.2
|
||||
deleted := ctrl.upgradeVolumeFrom1_2(volume.(*api.PersistentVolume))
|
||||
if !deleted {
|
||||
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
||||
}
|
||||
}
|
||||
|
||||
claimListObj, err := claimSource.List(api.ListOptions{})
|
||||
@ -164,6 +168,17 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
|
||||
// addVolume is callback from framework.Controller watching PersistentVolume
|
||||
// events.
|
||||
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
||||
pv, ok := obj.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if ctrl.upgradeVolumeFrom1_2(pv) {
|
||||
// volume deleted
|
||||
return
|
||||
}
|
||||
|
||||
// Store the new volume version in the cache and do not process it if this
|
||||
// is an old version.
|
||||
new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume")
|
||||
@ -174,11 +189,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
pv, ok := obj.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
|
||||
return
|
||||
}
|
||||
if err := ctrl.syncVolume(pv); err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
// Version conflict error happens quite often and the controller
|
||||
@ -193,6 +203,17 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
||||
// updateVolume is callback from framework.Controller watching PersistentVolume
|
||||
// events.
|
||||
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
|
||||
newVolume, ok := newObj.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if ctrl.upgradeVolumeFrom1_2(newVolume) {
|
||||
// volume deleted
|
||||
return
|
||||
}
|
||||
|
||||
// Store the new volume version in the cache and do not process it if this
|
||||
// is an old version.
|
||||
new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume")
|
||||
@ -203,11 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
|
||||
return
|
||||
}
|
||||
|
||||
newVolume, ok := newObj.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
|
||||
return
|
||||
}
|
||||
if err := ctrl.syncVolume(newVolume); err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
// Version conflict error happens quite often and the controller
|
||||
@ -400,6 +416,44 @@ func (ctrl *PersistentVolumeController) Stop() {
|
||||
close(ctrl.claimControllerStopCh)
|
||||
}
|
||||
|
||||
const (
|
||||
// these pair of constants are used by the provisioner in Kubernetes 1.2.
|
||||
pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required"
|
||||
pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed"
|
||||
)
|
||||
|
||||
// upgradeVolumeFrom1_2 updates PV from Kubernetes 1.2 to 1.3 and newer. In 1.2,
|
||||
// we used template PersistentVolume instances for dynamic provisioning. In 1.3
|
||||
// and later, these template (and not provisioned) instances must be removed to
|
||||
// make the controller to provision a new PV.
|
||||
// It returns true if the volume was deleted.
|
||||
// TODO: remove this function when upgrade from 1.2 becomes unsupported.
|
||||
func (ctrl *PersistentVolumeController) upgradeVolumeFrom1_2(volume *api.PersistentVolume) bool {
|
||||
annValue, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
|
||||
if !found {
|
||||
// The volume is not template
|
||||
return false
|
||||
}
|
||||
if annValue == pvProvisioningCompletedAnnotationValue {
|
||||
// The volume is already fully provisioned. The new controller will
|
||||
// ignore this annotation and it will obey its ReclaimPolicy, which is
|
||||
// likely to delete the volume when appropriate claim is deleted.
|
||||
return false
|
||||
}
|
||||
glog.V(2).Infof("deleting unprovisioned template volume %q from Kubernetes 1.2.", volume.Name)
|
||||
err := ctrl.kubeClient.Core().PersistentVolumes().Delete(volume.Name, nil)
|
||||
if err != nil {
|
||||
glog.Errorf("cannot delete unprovisioned template volume %q: %v", volume.Name, err)
|
||||
}
|
||||
// Remove from local cache
|
||||
err = ctrl.volumes.store.Delete(volume)
|
||||
if err != nil {
|
||||
glog.Errorf("cannot remove volume %q from local cache: %v", volume.Name, err)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Stateless functions
|
||||
|
||||
func hasAnnotation(obj api.ObjectMeta, ann string) bool {
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
// can't reliably simulate periodic sync of volumes/claims - it would be
|
||||
// either very timing-sensitive or slow to wait for real periodic sync.
|
||||
func TestControllerSync(t *testing.T) {
|
||||
expectedChanges := []int{4, 1, 1}
|
||||
expectedChanges := []int{4, 1, 1, 2, 1, 1, 1}
|
||||
tests := []controllerTest{
|
||||
// [Unit test set 5] - controller tests.
|
||||
// We test the controller as if
|
||||
@ -87,6 +87,74 @@ func TestControllerSync(t *testing.T) {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// addVolume with provisioned volume from Kubernetes 1.2. No "action"
|
||||
// is expected - it should stay bound.
|
||||
"5-5 - add bound volume from 1.2",
|
||||
novolumes,
|
||||
[]*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-5", "10Gi", "uid5-5", "claim5-5", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)},
|
||||
newClaimArray("claim5-5", "uid5-5", "1Gi", "", api.ClaimPending),
|
||||
newClaimArray("claim5-5", "uid5-5", "1Gi", "volume5-5", api.ClaimBound, annBindCompleted, annBoundByController),
|
||||
noevents, noerrors,
|
||||
// Custom test function that generates a add event
|
||||
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
||||
volume := newVolume("volume5-5", "10Gi", "uid5-5", "claim5-5", api.VolumeBound, api.PersistentVolumeReclaimDelete)
|
||||
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)
|
||||
reactor.addVolumeEvent(volume)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// updateVolume with provisioned volume from Kubernetes 1.2. No
|
||||
// "action" is expected - it should stay bound.
|
||||
"5-6 - update bound volume from 1.2",
|
||||
[]*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)},
|
||||
[]*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)},
|
||||
newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", api.ClaimBound),
|
||||
newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", api.ClaimBound, annBindCompleted),
|
||||
noevents, noerrors,
|
||||
// Custom test function that generates a add event
|
||||
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
||||
volume := newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete)
|
||||
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)
|
||||
reactor.modifyVolumeEvent(volume)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// addVolume with unprovisioned volume from Kubernetes 1.2. The
|
||||
// volume should be deleted.
|
||||
"5-7 - add unprovisioned volume from 1.2",
|
||||
novolumes,
|
||||
novolumes,
|
||||
newClaimArray("claim5-7", "uid5-7", "1Gi", "", api.ClaimPending),
|
||||
newClaimArray("claim5-7", "uid5-7", "1Gi", "", api.ClaimPending),
|
||||
noevents, noerrors,
|
||||
// Custom test function that generates a add event
|
||||
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
||||
volume := newVolume("volume5-7", "10Gi", "uid5-7", "claim5-7", api.VolumeBound, api.PersistentVolumeReclaimDelete)
|
||||
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, "yes")
|
||||
reactor.addVolumeEvent(volume)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// updateVolume with unprovisioned volume from Kubernetes 1.2. The
|
||||
// volume should be deleted.
|
||||
"5-8 - update bound volume from 1.2",
|
||||
novolumes,
|
||||
novolumes,
|
||||
newClaimArray("claim5-8", "uid5-8", "1Gi", "", api.ClaimPending),
|
||||
newClaimArray("claim5-8", "uid5-8", "1Gi", "", api.ClaimPending),
|
||||
noevents, noerrors,
|
||||
// Custom test function that generates a add event
|
||||
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
||||
volume := newVolume("volume5-8", "10Gi", "uid5-8", "claim5-8", api.VolumeBound, api.PersistentVolumeReclaimDelete)
|
||||
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, "yes")
|
||||
reactor.modifyVolumeEvent(volume)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for ix, test := range tests {
|
||||
@ -108,7 +176,7 @@ func TestControllerSync(t *testing.T) {
|
||||
}
|
||||
|
||||
// Start the controller
|
||||
defer ctrl.Stop()
|
||||
count := reactor.getChangeCount()
|
||||
go ctrl.Run()
|
||||
|
||||
// Wait for the controller to pass initial sync and fill its caches.
|
||||
@ -121,8 +189,6 @@ func TestControllerSync(t *testing.T) {
|
||||
}
|
||||
glog.V(4).Infof("controller synced, starting test")
|
||||
|
||||
count := reactor.getChangeCount()
|
||||
|
||||
// Call the tested function
|
||||
err := test.test(ctrl, reactor, test)
|
||||
if err != nil {
|
||||
@ -133,10 +199,15 @@ func TestControllerSync(t *testing.T) {
|
||||
ctrl.claims.Resync()
|
||||
ctrl.volumes.store.Resync()
|
||||
|
||||
// Wait at least once, just in case expectedChanges[ix] == 0
|
||||
reactor.waitTest()
|
||||
// Wait for expected number of operations.
|
||||
for reactor.getChangeCount() < count+expectedChanges[ix] {
|
||||
reactor.waitTest()
|
||||
}
|
||||
|
||||
ctrl.Stop()
|
||||
|
||||
evaluateTestResults(ctrl, reactor, test, t)
|
||||
}
|
||||
}
|
||||
@ -208,3 +279,11 @@ func TestControllerCacheParsingError(t *testing.T) {
|
||||
t.Errorf("Expected parsing error, got nil instead")
|
||||
}
|
||||
}
|
||||
|
||||
func addVolumeAnnotation(volume *api.PersistentVolume, annName, annValue string) *api.PersistentVolume {
|
||||
if volume.Annotations == nil {
|
||||
volume.Annotations = make(map[string]string)
|
||||
}
|
||||
volume.Annotations[annName] = annValue
|
||||
return volume
|
||||
}
|
||||
|
@ -253,6 +253,7 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
_, found := r.volumes[name]
|
||||
if found {
|
||||
delete(r.volumes, name)
|
||||
r.changedSinceLastSync++
|
||||
return true, nil, nil
|
||||
} else {
|
||||
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
|
||||
@ -264,6 +265,7 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
_, found := r.volumes[name]
|
||||
if found {
|
||||
delete(r.claims, name)
|
||||
r.changedSinceLastSync++
|
||||
return true, nil, nil
|
||||
} else {
|
||||
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
|
||||
@ -519,6 +521,20 @@ func (r *volumeReactor) addVolumeEvent(volume *api.PersistentVolume) {
|
||||
r.volumeSource.Add(volume)
|
||||
}
|
||||
|
||||
// modifyVolumeEvent simulates that a volume has been modified in etcd and the
|
||||
// controller receives 'volume modified' event.
|
||||
func (r *volumeReactor) modifyVolumeEvent(volume *api.PersistentVolume) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
r.volumes[volume.Name] = volume
|
||||
// Generate deletion event. Cloned volume is needed to prevent races (and we
|
||||
// would get a clone from etcd too).
|
||||
clone, _ := conversion.NewCloner().DeepCopy(volume)
|
||||
volumeClone := clone.(*api.PersistentVolume)
|
||||
r.volumeSource.Modify(volumeClone)
|
||||
}
|
||||
|
||||
// addClaimEvent simulates that a claim has been deleted in etcd and the
|
||||
// controller receives 'claim added' event.
|
||||
func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) {
|
||||
|
Loading…
Reference in New Issue
Block a user