diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 55d7cf66e9e..1a1094ad40d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -235,8 +235,8 @@ func (s *CMServer) Run(_ []string) error { namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) namespaceManager.Run() - pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient) - pvclaimBinder.Run(s.PVClaimBinderSyncPeriod) + pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) + pvclaimBinder.Run() select {} return nil diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index e2a22d5b305..b655fe6a3a0 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -18,34 +18,35 @@ package volumeclaimbinder import ( "fmt" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" + "reflect" ) // PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims. type PersistentVolumeClaimBinder struct { - volumeStore *persistentVolumeOrderedIndex - claimStore cache.Store - client binderClient - - // protects access to binding - lock sync.RWMutex + volumeIndex *persistentVolumeOrderedIndex + volumeController *framework.Controller + claimController *framework.Controller + client binderClient } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder -func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolumeClaimBinder { - volumeStore := NewPersistentVolumeOrderedIndex() - volumeReflector := cache.NewReflector( +func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { + volumeIndex := NewPersistentVolumeOrderedIndex() + binderClient := NewBinderClient(kubeClient) + + _, volumeController := framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) @@ -55,13 +56,28 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu }, }, &api.PersistentVolume{}, - volumeStore, - 0, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + volume := obj.(*api.PersistentVolume) + volumeIndex.Indexer.Add(volume) + syncVolume(binderClient, volume) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldVolume := oldObj.(*api.PersistentVolume) + newVolume := newObj.(*api.PersistentVolume) + volumeIndex.Indexer.Update(newVolume) + if updateRequired(oldVolume, newVolume) { + syncVolume(binderClient, newVolume) + } + }, + DeleteFunc: func(obj interface{}) { + volume := obj.(*api.PersistentVolume) + volumeIndex.Indexer.Delete(volume) + }, + }, ) - volumeReflector.Run() - - claimStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - claimReflector := cache.NewReflector( + _, claimController := framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything()) @@ -71,51 +87,75 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu }, }, &api.PersistentVolumeClaim{}, - claimStore, - 0, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + claim := obj.(*api.PersistentVolumeClaim) + syncClaim(volumeIndex, binderClient, claim) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // oldClaim := newObj.(*api.PersistentVolumeClaim) + newClaim := newObj.(*api.PersistentVolumeClaim) + if newClaim.Status.VolumeRef == nil { + syncClaim(volumeIndex, binderClient, newClaim) + } + }, + }, ) - claimReflector.Run() binder := &PersistentVolumeClaimBinder{ - volumeStore: volumeStore, - claimStore: claimStore, - client: NewBinderClient(kubeClient), + volumeController: volumeController, + claimController: claimController, + volumeIndex: volumeIndex, + client: binderClient, } return binder } -// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. -func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { - volume := obj.(*api.PersistentVolume) +func updateRequired(oldVolume, newVolume *api.PersistentVolume) bool { + // Spec changes affect indexing and sorting volumes + if !reflect.DeepEqual(oldVolume.Spec, newVolume.Spec) { + return true + } + if !reflect.DeepEqual(oldVolume.Status, newVolume.Status) { + return true + } + return false +} + +func syncVolume(binderClient binderClient, volume *api.PersistentVolume) (err error) { glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) if volume.Spec.ClaimRef != nil { if volume.Status.Phase == api.VolumeAvailable { volume.Status.Phase = api.VolumeBound - _, err := controller.client.UpdatePersistentVolumeStatus(volume) + _, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv.status: %v\n", err) } } // verify the volume is still claimed by a user - if claim, err := controller.client.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { + if claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) - controller.syncPersistentVolumeClaimStatus(volume, claim) + // rebuild the Claim's Status as needed + if claim.Status.VolumeRef == nil { + syncClaimStatus(binderClient, volume, claim) + } } else { //claim was deleted by user. glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume volume.Status.Phase = api.VolumeReleased - volume, err = controller.client.UpdatePersistentVolumeStatus(volume) + volume, err = binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv: %+v\n", err) } } } else { volume.Status.Phase = api.VolumeAvailable - _, err := controller.client.UpdatePersistentVolumeStatus(volume) + _, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv.status: %v\n", err) } @@ -123,58 +163,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interfac return nil } -func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { - controller.lock.Lock() - defer controller.lock.Unlock() - - claim := obj.(*api.PersistentVolumeClaim) - glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) - - if claim.Status.VolumeRef != nil { - glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) - return nil - } - - pv, err := controller.volumeStore.FindBestMatchForClaim(claim) - if err != nil { - return err - } - - if pv != nil { - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - // make a binding reference to the claim - pv.Spec.ClaimRef = claimRef - pv, err = controller.client.UpdatePersistentVolume(pv) - - if err != nil { - // volume no longer bound - pv.Spec.ClaimRef = nil - return fmt.Errorf("Error updating volume: %+v\n", err) - } else { - glog.V(3).Infof("PersistentVolumeClaim[%s] bound to PersistentVolume[%s]\n", claim.Name, pv.Name) - pv.Status.Phase = api.VolumeBound - err := controller.syncPersistentVolumeClaimStatus(pv, claim) - if err != nil { - return fmt.Errorf("Error updating pvclaim.status: %v\n", err) - } - } - } else { - glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) - claim.Status.Phase = api.ClaimPending - _, err := controller.client.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { - return fmt.Errorf("Error updating pvclaim.status: %v\n", err) - } - } - return nil -} - -// syncPersistentVolumeClaimStatus builds and persistens a PVClaim's Status, rolling back to empty values if the update fails -func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) error { +func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) (err error) { volumeRef, err := api.GetReference(volume) if err != nil { return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) @@ -186,7 +175,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v claim.Status.AccessModes = volume.Spec.AccessModes claim.Status.Capacity = volume.Spec.Capacity - _, err = controller.client.UpdatePersistentVolumeClaimStatus(claim) + _, err = binderClient.UpdatePersistentVolumeClaimStatus(claim) if err != nil { claim.Status.Phase = api.ClaimPending @@ -198,54 +187,56 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v return err } -func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { +func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { + glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) + + if claim.Status.VolumeRef != nil { + glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) + return nil + } + + volume, err := volumeIndex.FindBestMatchForClaim(claim) + if err != nil { + return err + } + + if volume != nil { + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) + } + + // make a binding reference to the claim + volume.Spec.ClaimRef = claimRef + volume, err = binderClient.UpdatePersistentVolume(volume) + + if err != nil { + // volume no longer bound + volume.Spec.ClaimRef = nil + return fmt.Errorf("Error updating volume: %+v\n", err) + } else { + err = syncClaimStatus(binderClient, volume, claim) + if err != nil { + return fmt.Errorf("Error update claim.status: %+v\n", err) + } + } + } else { + glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) + if claim.Status.Phase != api.ClaimPending { + claim.Status.Phase = api.ClaimPending + _, err := binderClient.UpdatePersistentVolumeClaimStatus(claim) + if err != nil { + return fmt.Errorf("Error updating pvclaim.status: %v\n", err) + } + } + } + return nil +} + +func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") - go util.Forever(func() { controller.synchronize() }, period) -} - -// Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop, -// because we're reconciling two Kinds in this component and don't want to dupe the loop -// TODO MarkT - refactor to new DeltaFifo and new controller framework -type Synchronizer struct { - ListFunc func() []interface{} - ReconcileFunc func(interface{}) error -} - -func (controller *PersistentVolumeClaimBinder) synchronize() { - volumeSynchronizer := Synchronizer{ - ListFunc: controller.volumeStore.List, - ReconcileFunc: controller.syncPersistentVolume, - } - - claimsSynchronizer := Synchronizer{ - ListFunc: controller.claimStore.List, - ReconcileFunc: controller.syncPersistentVolumeClaim, - } - - controller.reconcile(volumeSynchronizer, claimsSynchronizer) -} - -func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchronizer) { - for _, synchronizer := range synchronizers { - items := synchronizer.ListFunc() - if len(items) == 0 { - continue - } - wg := sync.WaitGroup{} - wg.Add(len(items)) - for ix := range items { - go func(ix int) { - defer wg.Done() - obj := items[ix] - glog.V(5).Infof("Reconciliation of %v", obj) - err := synchronizer.ReconcileFunc(obj) - if err != nil { - glog.Errorf("Error reconciling: %v", err) - } - }(ix) - } - wg.Wait() - } + go controller.claimController.Run(make(chan struct{})) + go controller.volumeController.Run(make(chan struct{})) } // binderClient abstracts access to PVs and PVCs diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index a310e86c6fc..4b6eab019e8 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -144,6 +144,54 @@ func TestExampleObjects(t *testing.T) { } } +func TestRequiresUpdate(t *testing.T) { + old := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data02", + }, + }, + }, + } + + new := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data02", + }, + }, + ClaimRef: &api.ObjectReference{Name: "foo"}, + }, + } + + if !updateRequired(old, new) { + t.Errorf("Update expected for the new volume with added ClaimRef") + } + + old.Spec.ClaimRef = new.Spec.ClaimRef + old.Status.Phase = api.VolumeBound + + if !updateRequired(old, new) { + t.Errorf("Update expected for the new volume with added Status") + } + + new.Status.Phase = old.Status.Phase + + if updateRequired(old, new) { + t.Errorf("No updated expected for identical objects") + } +} + func TestBindingWithExamples(t *testing.T) { api.ForTesting_ReferencesAllowBlankSelfLinks = true @@ -167,33 +215,39 @@ func TestBindingWithExamples(t *testing.T) { t.Error("Unexpected error getting PVC from client: %v", err) } + volumeIndex := NewPersistentVolumeOrderedIndex() mockClient := &mockBinderClient{ volume: pv, claim: claim, } - controller := PersistentVolumeClaimBinder{ - volumeStore: NewPersistentVolumeOrderedIndex(), - client: mockClient, - } - - controller.volumeStore.Add(pv) - controller.syncPersistentVolume(pv) + volumeIndex.Add(pv) + syncVolume(mockClient, pv) if pv.Status.Phase != api.VolumeAvailable { t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) } - controller.syncPersistentVolumeClaim(claim) - - if pv.Status.Phase != api.VolumeBound { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + if pv.Spec.ClaimRef != nil { + t.Errorf("Expected nil ClaimRef but got %+v\n", pv.Spec.ClaimRef) } + syncClaim(volumeIndex, mockClient, claim) + + if pv.Spec.ClaimRef == nil { + t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv) + } + + syncVolume(mockClient, pv) + if claim.Status.VolumeRef == nil { t.Error("Expected claim to be bound to volume") } + if pv.Status.Phase != api.VolumeBound { + t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + } + if claim.Status.Phase != api.ClaimBound { t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) } @@ -206,8 +260,8 @@ func TestBindingWithExamples(t *testing.T) { // pretend the user deleted their claim mockClient.claim = nil + syncVolume(mockClient, pv) - controller.syncPersistentVolume(pv) if pv.Status.Phase != api.VolumeReleased { t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) }