From fb412e47e4ec905500b183590c980090c38e9841 Mon Sep 17 00:00:00 2001 From: markturansky Date: Thu, 16 Apr 2015 13:26:08 -0400 Subject: [PATCH] Addressed feedback, improved flow and comments --- .../app/controllermanager.go | 7 + .../persistent_volume_claim_binder.go | 188 ++++++++++-------- 2 files changed, 110 insertions(+), 85 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 6b74e2cd61f..80386764f47 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -45,6 +45,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" ) // CMServer is the main context object for the controller manager. @@ -58,6 +59,7 @@ type CMServer struct { NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration NamespaceSyncPeriod time.Duration + PVClaimBinderSyncPeriod time.Duration RegisterRetryCount int MachineList util.StringList SyncNodeList bool @@ -90,6 +92,7 @@ func NewCMServer() *CMServer { NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, NamespaceSyncPeriod: 5 * time.Minute, + PVClaimBinderSyncPeriod: 10 * time.Second, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, NodeMilliCPU: 1000, @@ -113,6 +116,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") + fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder_sync_period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims") fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.") fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.") fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") @@ -231,6 +235,9 @@ func (s *CMServer) Run(_ []string) error { namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) namespaceManager.Run() + pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient) + pvclaimBinder.Run(s.PVClaimBinderSyncPeriod) + select {} return nil } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index 3109531fbbf..4fb23498fad 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -17,10 +17,10 @@ limitations under the License. package volumeclaimbinder import ( + "fmt" "sync" "time" - "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" @@ -37,6 +37,8 @@ type PersistentVolumeClaimBinder struct { volumeStore *persistentVolumeOrderedIndex claimStore cache.Store client client.Interface + // protects access to binding + lock sync.RWMutex } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder @@ -82,6 +84,106 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu return binder } +// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. +func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { + volume := obj.(*api.PersistentVolume) + glog.V(5).Infof("Synchronizing PersistentVolume[%s]%s\n", volume.Name) + + if volume.Spec.ClaimRef != nil { + if volume.Status.Phase == api.VolumeAvailable { + volume.Status.Phase = api.VolumeBound + _, err := controller.client.PersistentVolumes().Update(volume) + if err != nil { + return fmt.Errorf("Error updating pv.status: %v\n", err) + } + } + + // verify the volume is still claimed by a user + if claim, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil { + glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) + controller.syncPersistentVolumeClaimStatus(volume, claim) + } else { + //claim was deleted by user. + glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) + // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly release volume + volume.Status.Phase = api.VolumeReleased + volume, err = controller.client.PersistentVolumes().UpdateStatus(volume) + if err != nil { + return fmt.Errorf("Error updating pv: %+v\n", err) + } + } + } + return nil +} + +func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { + claim := obj.(*api.PersistentVolumeClaim) + glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) + + if claim.Status.VolumeRef != nil { + glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) + return nil + } + + pv, err := controller.volumeStore.FindBestMatchForClaim(claim) + if err != nil { + return err + } + + if pv != nil { + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) + } + + // make a binding reference to the claim + pv.Spec.ClaimRef = claimRef + pv, err = controller.client.PersistentVolumes().Update(pv) + + if err != nil { + // volume no longer bound + pv.Spec.ClaimRef = nil + return fmt.Errorf("Error updating volume: %+v\n", err) + } else { + glog.V(3).Infof("PersistentVolumeClaim[%s] bound to PersistentVolume[%s]\n", claim.Name, pv.Name) + pv.Status.Phase = api.VolumeBound + err := controller.syncPersistentVolumeClaimStatus(pv, claim) + if err != nil { + return fmt.Errorf("Error updating pvclaim.status: %v\n", err) + } + } + } else { + glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) + } + return nil +} + +// syncPersistentVolumeClaimStatus builds and persistens a PVClaim's Status, rolling back to empty values if the update fails +func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) error { + volumeRef, err := api.GetReference(volume) + if err != nil { + return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) + } + // all "actuals" are transferred from PV to PVC so the user knows what + // type of volume they actually got for their claim + claim.Status.Phase = api.ClaimBound + claim.Status.VolumeRef = volumeRef + claim.Status.AccessModes = volume.Spec.AccessModes + claim.Status.Capacity = volume.Spec.Capacity + + _, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) + + if err != nil { + claim.Status.Phase = api.ClaimPending + claim.Status.VolumeRef = nil + claim.Status.AccessModes = nil + claim.Status.Capacity = nil + } + + return err +} + + func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") go util.Forever(func() { controller.synchronize() }, period) @@ -130,87 +232,3 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr wg.Wait() } } - -// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. -func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { - volume := obj.(*api.PersistentVolume) - glog.V(5).Infof("Synchronizing persistent volume: %s\n", volume.Name) - - // verify the volume is still claimed by a user - if volume.Spec.ClaimRef != nil { - if _, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil { - glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) - } else { - //claim was deleted by user. - glog.V(3).Infof("PersistentVolumeClaim[UID=%s] unbound from PersistentVolume[UID=%s]\n", volume.Spec.ClaimRef.UID, volume.UID) - volume.Spec.ClaimRef = nil - volume.Status.Phase = api.VolumeReleased - volume, err = controller.client.PersistentVolumes().Update(volume) - if err != nil { - glog.V(3).Infof("Error updating volume: %+v\n", err) - } - } - } - return nil -} - -func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { - claim := obj.(*api.PersistentVolumeClaim) - glog.V(5).Infof("Synchronizing persistent volume claim: %s\n", claim.Name) - - if claim.Status.VolumeRef != nil { - glog.V(5).Infof("PersistentVolumeClaim[UID=%s] is bound to PersistentVolume[UID=%s]\n", claim.Name, claim.Status.VolumeRef.Name) - return nil - } - - volume, err := controller.volumeStore.FindBestMatchForClaim(claim) - if err != nil { - return err - } - - if volume != nil { - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - volumeRef, err := api.GetReference(volume) - if err != nil { - return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) - } - - // make a binding reference to the claim - volume.Spec.ClaimRef = claimRef - volume, err = controller.client.PersistentVolumes().Update(volume) - - if err != nil { - glog.V(3).Infof("Error updating volume: %+v\n", err) - } else { - - // all "actuals" are transferred from PV to PVC so the user knows what - // type of volume they actually got for their claim - claim.Status.Phase = api.ClaimBound - claim.Status.VolumeRef = volumeRef - claim.Status.AccessModes = volume.Spec.AccessModes - claim.Status.Capacity = volume.Spec.Capacity - - _, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) - if err != nil { - glog.V(3).Infof("Error updating claim: %+v\n", err) - - // uset ClaimRef on the pointer to make it available for binding again - volume.Spec.ClaimRef = nil - volume, err = controller.client.PersistentVolumes().Update(volume) - - // unset VolumeRef on the pointer so this claim can be processed next sync loop - claim.Status.VolumeRef = nil - } else { - glog.V(2).Infof("PersistentVolumeClaim[UID=%s] bound to PersistentVolume[UID=%s]\n", claim.UID, volume.UID) - } - } - } else { - glog.V(5).Infof("No volume match found for %s\n", claim.UID) - } - - return nil -}