mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-24 19:38:02 +00:00
Added pending phase for volumes. added defaults for PV/PVC. refactored to better phase transitioning in control loops
This commit is contained in:
@@ -18,10 +18,11 @@ package volumeclaimbinder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||
@@ -40,12 +41,17 @@ type PersistentVolumeClaimBinder struct {
|
||||
claimController *framework.Controller
|
||||
client binderClient
|
||||
stopChannels map[string]chan struct{}
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
|
||||
func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
|
||||
volumeIndex := NewPersistentVolumeOrderedIndex()
|
||||
binderClient := NewBinderClient(kubeClient)
|
||||
binder := &PersistentVolumeClaimBinder{
|
||||
volumeIndex: volumeIndex,
|
||||
client: binderClient,
|
||||
}
|
||||
|
||||
_, volumeController := framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
@@ -59,23 +65,9 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
|
||||
&api.PersistentVolume{},
|
||||
syncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
volume := obj.(*api.PersistentVolume)
|
||||
volumeIndex.Indexer.Add(volume)
|
||||
syncVolume(binderClient, volume)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
oldVolume := oldObj.(*api.PersistentVolume)
|
||||
newVolume := newObj.(*api.PersistentVolume)
|
||||
volumeIndex.Indexer.Update(newVolume)
|
||||
if updateRequired(oldVolume, newVolume) {
|
||||
syncVolume(binderClient, newVolume)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
volume := obj.(*api.PersistentVolume)
|
||||
volumeIndex.Indexer.Delete(volume)
|
||||
},
|
||||
AddFunc: binder.addVolume,
|
||||
UpdateFunc: binder.updateVolume,
|
||||
DeleteFunc: binder.deleteVolume,
|
||||
},
|
||||
)
|
||||
_, claimController := framework.NewInformer(
|
||||
@@ -90,75 +82,186 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
|
||||
&api.PersistentVolumeClaim{},
|
||||
syncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
claim := obj.(*api.PersistentVolumeClaim)
|
||||
syncClaim(volumeIndex, binderClient, claim)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
newClaim := newObj.(*api.PersistentVolumeClaim)
|
||||
if newClaim.Status.VolumeRef == nil {
|
||||
syncClaim(volumeIndex, binderClient, newClaim)
|
||||
}
|
||||
},
|
||||
AddFunc: binder.addClaim,
|
||||
UpdateFunc: binder.updateClaim,
|
||||
},
|
||||
)
|
||||
|
||||
binder := &PersistentVolumeClaimBinder{
|
||||
volumeController: volumeController,
|
||||
claimController: claimController,
|
||||
volumeIndex: volumeIndex,
|
||||
client: binderClient,
|
||||
}
|
||||
binder.claimController = claimController
|
||||
binder.volumeController = volumeController
|
||||
|
||||
return binder
|
||||
}
|
||||
|
||||
func updateRequired(oldVolume, newVolume *api.PersistentVolume) bool {
|
||||
// Spec changes affect indexing and sorting volumes
|
||||
if !reflect.DeepEqual(oldVolume.Spec, newVolume.Spec) {
|
||||
return true
|
||||
}
|
||||
if !reflect.DeepEqual(oldVolume.Status, newVolume.Status) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
|
||||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
volume := obj.(*api.PersistentVolume)
|
||||
syncVolume(binder.volumeIndex, binder.client, volume)
|
||||
}
|
||||
|
||||
func syncVolume(binderClient binderClient, volume *api.PersistentVolume) (err error) {
|
||||
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
|
||||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
newVolume := newObj.(*api.PersistentVolume)
|
||||
binder.volumeIndex.Update(newVolume)
|
||||
syncVolume(binder.volumeIndex, binder.client, newVolume)
|
||||
}
|
||||
|
||||
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
|
||||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
volume := obj.(*api.PersistentVolume)
|
||||
binder.volumeIndex.Delete(volume)
|
||||
}
|
||||
|
||||
func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
|
||||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
claim := obj.(*api.PersistentVolumeClaim)
|
||||
syncClaim(binder.volumeIndex, binder.client, claim)
|
||||
}
|
||||
|
||||
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
|
||||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
newClaim := newObj.(*api.PersistentVolumeClaim)
|
||||
syncClaim(binder.volumeIndex, binder.client, newClaim)
|
||||
}
|
||||
|
||||
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
|
||||
glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name)
|
||||
|
||||
if volume.Spec.ClaimRef != nil {
|
||||
if volume.Status.Phase == api.VolumeAvailable {
|
||||
volume.Status.Phase = api.VolumeBound
|
||||
_, err := binderClient.UpdatePersistentVolumeStatus(volume)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error updating pv.status: %v\n", err)
|
||||
// volumes can be in one of the following states:
|
||||
//
|
||||
// VolumePending -- default value -- not bound to a claim and not yet processed through this controller.
|
||||
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
|
||||
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
|
||||
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
|
||||
currentPhase := volume.Status.Phase
|
||||
nextPhase := currentPhase
|
||||
|
||||
switch currentPhase {
|
||||
// pending volumes are available only after indexing in order to be matched to claims.
|
||||
case api.VolumePending:
|
||||
_, exists, err := volumeIndex.Get(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
volumeIndex.Add(volume)
|
||||
}
|
||||
glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name)
|
||||
nextPhase = api.VolumeAvailable
|
||||
|
||||
// available volumes await a claim
|
||||
case api.VolumeAvailable:
|
||||
if volume.Spec.ClaimRef != nil {
|
||||
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
|
||||
if err == nil {
|
||||
// change of phase will trigger an update event with the newly bound volume
|
||||
glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name)
|
||||
nextPhase = api.VolumeBound
|
||||
} else {
|
||||
if errors.IsNotFound(err) {
|
||||
nextPhase = api.VolumeReleased
|
||||
}
|
||||
}
|
||||
}
|
||||
//bound volumes require verification of their bound claims
|
||||
case api.VolumeBound:
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
|
||||
} else {
|
||||
claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
|
||||
if err == nil {
|
||||
// bound and active. Build claim status as needed.
|
||||
if claim.Status.VolumeRef == nil {
|
||||
syncClaimStatus(binderClient, volume, claim)
|
||||
}
|
||||
} else {
|
||||
if errors.IsNotFound(err) {
|
||||
nextPhase = api.VolumeReleased
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// released volumes require recycling
|
||||
case api.VolumeReleased:
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
|
||||
} else {
|
||||
// TODO: implement Recycle method on plugins
|
||||
}
|
||||
}
|
||||
|
||||
if currentPhase != nextPhase {
|
||||
volume.Status.Phase = nextPhase
|
||||
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
|
||||
if err != nil {
|
||||
// Rollback to previous phase
|
||||
volume.Status.Phase = currentPhase
|
||||
}
|
||||
volumeIndex.Update(volume)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
|
||||
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
|
||||
|
||||
// claims can be in one of the following states:
|
||||
//
|
||||
// ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist.
|
||||
// ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil
|
||||
currentPhase := claim.Status.Phase
|
||||
nextPhase := currentPhase
|
||||
|
||||
switch currentPhase {
|
||||
|
||||
// pending claims await a matching volume
|
||||
case api.ClaimPending:
|
||||
volume, err := volumeIndex.FindBestMatchForClaim(claim)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if volume == nil {
|
||||
return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name)
|
||||
}
|
||||
|
||||
// verify the volume is still claimed by a user
|
||||
if claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil {
|
||||
glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name)
|
||||
// rebuild the Claim's Status as needed
|
||||
if claim.Status.VolumeRef == nil {
|
||||
syncClaimStatus(binderClient, volume, claim)
|
||||
}
|
||||
} else {
|
||||
//claim was deleted by user.
|
||||
glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name)
|
||||
// volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume
|
||||
volume.Status.Phase = api.VolumeReleased
|
||||
volume, err = binderClient.UpdatePersistentVolumeStatus(volume)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error updating pv: %+v\n", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
volume.Status.Phase = api.VolumeAvailable
|
||||
_, err := binderClient.UpdatePersistentVolumeStatus(volume)
|
||||
claimRef, err := api.GetReference(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error updating pv.status: %v\n", err)
|
||||
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
|
||||
}
|
||||
|
||||
// make a binding reference to the claim.
|
||||
// triggers update of the volume in this controller, which builds claim status
|
||||
volume.Spec.ClaimRef = claimRef
|
||||
volume, err = binderClient.UpdatePersistentVolume(volume)
|
||||
if err == nil {
|
||||
nextPhase = api.ClaimBound
|
||||
}
|
||||
if err != nil {
|
||||
// Rollback by unsetting the ClaimRef on the volume pointer.
|
||||
// the volume in the index will be unbound again and ready to be matched.
|
||||
volume.Spec.ClaimRef = nil
|
||||
// Rollback by restoring original phase to claim pointer
|
||||
nextPhase = api.ClaimPending
|
||||
return fmt.Errorf("Error updating volume: %+v\n", err)
|
||||
}
|
||||
|
||||
// bound claims requires no maintenance. Deletion by the user is the last lifecycle phase.
|
||||
case api.ClaimBound:
|
||||
// This is the end of a claim's lifecycle.
|
||||
// After claim deletion, a volume is recycled when it verifies its claim is unbound
|
||||
glog.V(5).Infof("PersistentVolumeClaime[%s] is bound\n", claim.Name)
|
||||
}
|
||||
|
||||
if currentPhase != nextPhase {
|
||||
claim.Status.Phase = nextPhase
|
||||
binderClient.UpdatePersistentVolumeClaimStatus(claim)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -168,6 +271,7 @@ func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, cl
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error getting volume reference: %v\n", err)
|
||||
}
|
||||
|
||||
// all "actuals" are transferred from PV to PVC so the user knows what
|
||||
// type of volume they actually got for their claim
|
||||
claim.Status.Phase = api.ClaimBound
|
||||
@@ -176,63 +280,16 @@ func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, cl
|
||||
claim.Status.Capacity = volume.Spec.Capacity
|
||||
|
||||
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
|
||||
|
||||
if err != nil {
|
||||
claim.Status.Phase = api.ClaimPending
|
||||
claim.Status.VolumeRef = nil
|
||||
claim.Status.AccessModes = nil
|
||||
claim.Status.Capacity = nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
|
||||
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
|
||||
|
||||
if claim.Status.VolumeRef != nil {
|
||||
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
volume, err := volumeIndex.FindBestMatchForClaim(claim)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if volume != nil {
|
||||
claimRef, err := api.GetReference(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
|
||||
}
|
||||
|
||||
// make a binding reference to the claim
|
||||
volume.Spec.ClaimRef = claimRef
|
||||
volume, err = binderClient.UpdatePersistentVolume(volume)
|
||||
|
||||
if err != nil {
|
||||
// volume no longer bound
|
||||
volume.Spec.ClaimRef = nil
|
||||
return fmt.Errorf("Error updating volume: %+v\n", err)
|
||||
} else {
|
||||
err = syncClaimStatus(binderClient, volume, claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error update claim.status: %+v\n", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID)
|
||||
if claim.Status.Phase != api.ClaimPending {
|
||||
claim.Status.Phase = api.ClaimPending
|
||||
_, err := binderClient.UpdatePersistentVolumeClaimStatus(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error updating pvclaim.status: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run starts all of this binder's control loops
|
||||
func (controller *PersistentVolumeClaimBinder) Run() {
|
||||
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
|
||||
if controller.stopChannels == nil {
|
||||
@@ -250,6 +307,7 @@ func (controller *PersistentVolumeClaimBinder) Run() {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down this binder
|
||||
func (controller *PersistentVolumeClaimBinder) Stop() {
|
||||
glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n")
|
||||
for name, stopChan := range controller.stopChannels {
|
||||
|
@@ -17,12 +17,12 @@ limitations under the License.
|
||||
package volumeclaimbinder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
|
||||
@@ -51,7 +51,6 @@ func TestRunStop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestExampleObjects(t *testing.T) {
|
||||
|
||||
scenarios := map[string]struct {
|
||||
expected interface{}
|
||||
}{
|
||||
@@ -167,56 +166,7 @@ func TestExampleObjects(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequiresUpdate(t *testing.T) {
|
||||
old := &api.PersistentVolume{
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
|
||||
Capacity: api.ResourceList{
|
||||
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
|
||||
},
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
HostPath: &api.HostPathVolumeSource{
|
||||
Path: "/tmp/data02",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
new := &api.PersistentVolume{
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
|
||||
Capacity: api.ResourceList{
|
||||
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
|
||||
},
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
HostPath: &api.HostPathVolumeSource{
|
||||
Path: "/tmp/data02",
|
||||
},
|
||||
},
|
||||
ClaimRef: &api.ObjectReference{Name: "foo"},
|
||||
},
|
||||
}
|
||||
|
||||
if !updateRequired(old, new) {
|
||||
t.Errorf("Update expected for the new volume with added ClaimRef")
|
||||
}
|
||||
|
||||
old.Spec.ClaimRef = new.Spec.ClaimRef
|
||||
old.Status.Phase = api.VolumeBound
|
||||
|
||||
if !updateRequired(old, new) {
|
||||
t.Errorf("Update expected for the new volume with added Status")
|
||||
}
|
||||
|
||||
new.Status.Phase = old.Status.Phase
|
||||
|
||||
if updateRequired(old, new) {
|
||||
t.Errorf("No updated expected for identical objects")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBindingWithExamples(t *testing.T) {
|
||||
|
||||
api.ForTesting_ReferencesAllowBlankSelfLinks = true
|
||||
o := testclient.NewObjects(api.Scheme)
|
||||
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/claims/claim-01.yaml", o); err != nil {
|
||||
@@ -245,7 +195,7 @@ func TestBindingWithExamples(t *testing.T) {
|
||||
}
|
||||
|
||||
volumeIndex.Add(pv)
|
||||
syncVolume(mockClient, pv)
|
||||
syncVolume(volumeIndex, mockClient, pv)
|
||||
|
||||
if pv.Status.Phase != api.VolumeAvailable {
|
||||
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
|
||||
@@ -261,10 +211,13 @@ func TestBindingWithExamples(t *testing.T) {
|
||||
t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv)
|
||||
}
|
||||
|
||||
syncVolume(mockClient, pv)
|
||||
// first sync verifies the new bound claim, advances state, triggering update
|
||||
syncVolume(volumeIndex, mockClient, pv)
|
||||
// second sync verifies claim, sees missing claim status and builds it
|
||||
syncVolume(volumeIndex, mockClient, pv)
|
||||
|
||||
if claim.Status.VolumeRef == nil {
|
||||
t.Error("Expected claim to be bound to volume")
|
||||
t.Fatalf("Expected claim to be bound to volume")
|
||||
}
|
||||
|
||||
if pv.Status.Phase != api.VolumeBound {
|
||||
@@ -283,7 +236,7 @@ func TestBindingWithExamples(t *testing.T) {
|
||||
|
||||
// pretend the user deleted their claim
|
||||
mockClient.claim = nil
|
||||
syncVolume(mockClient, pv)
|
||||
syncVolume(volumeIndex, mockClient, pv)
|
||||
|
||||
if pv.Status.Phase != api.VolumeReleased {
|
||||
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
|
||||
@@ -311,7 +264,7 @@ func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*ap
|
||||
if c.claim != nil {
|
||||
return c.claim, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("Claim does not exist")
|
||||
return nil, errors.NewNotFound("persistentVolume", name)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user