Updating federation replicaset controller to support cascading deletion

This commit is contained in:
nikhiljindal 2016-11-06 14:14:44 -08:00
parent 6983262914
commit 54c2fbbfdf

View File

@ -29,16 +29,19 @@ import (
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
@ -94,6 +97,8 @@ type ReplicaSetController struct {
// For events
eventRecorder record.EventRecorder
deletionHelper *deletionhelper.DeletionHelper
defaultPlanner *planner.Planner
}
@ -205,9 +210,72 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
return err
})
frsc.deletionHelper = deletionhelper.NewDeletionHelper(
frsc.hasFinalizerFunc,
frsc.removeFinalizerFunc,
frsc.addFinalizerFunc,
// objNameFunc
func(obj runtime.Object) string {
replicaset := obj.(*extensionsv1.ReplicaSet)
return replicaset.Name
},
updateTimeout,
frsc.eventRecorder,
frsc.fedReplicaSetInformer,
frsc.fedUpdater,
)
return frsc
}
// Returns true if the given object has the given finalizer in its ObjectMeta.
func (frsc *ReplicaSetController) hasFinalizerFunc(obj runtime.Object, finalizer string) bool {
replicaset := obj.(*extensionsv1.ReplicaSet)
for i := range replicaset.ObjectMeta.Finalizers {
if string(replicaset.ObjectMeta.Finalizers[i]) == finalizer {
return true
}
}
return false
}
// Removes the finalizer from the given objects ObjectMeta.
// Assumes that the given object is a replicaset.
func (frsc *ReplicaSetController) removeFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
newFinalizers := []string{}
hasFinalizer := false
for i := range replicaset.ObjectMeta.Finalizers {
if string(replicaset.ObjectMeta.Finalizers[i]) != finalizer {
newFinalizers = append(newFinalizers, replicaset.ObjectMeta.Finalizers[i])
} else {
hasFinalizer = true
}
}
if !hasFinalizer {
// Nothing to do.
return obj, nil
}
replicaset.ObjectMeta.Finalizers = newFinalizers
replicaset, err := frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from replicaset %s: %v", finalizer, replicaset.Name, err)
}
return replicaset, nil
}
// Adds the given finalizer to the given objects ObjectMeta.
// Assumes that the given object is a replicaset.
func (frsc *ReplicaSetController) addFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
replicaset.ObjectMeta.Finalizers = append(replicaset.ObjectMeta.Finalizers, finalizer)
replicaset, err := frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
if err != nil {
return nil, fmt.Errorf("failed to add finalizer %s to replicaset %s: %v", finalizer, replicaset.Name, err)
}
return replicaset, nil
}
func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
go frsc.replicaSetController.Run(stopCh)
frsc.fedReplicaSetInformer.Start()
@ -413,7 +481,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))
obj, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
objFromStore, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
if err != nil {
return statusError, err
}
@ -421,7 +489,37 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil
}
frs := obj.(*extensionsv1.ReplicaSet)
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
frs, ok := obj.(*extensionsv1.ReplicaSet)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
frsc.deliverReplicaSetByKey(key, 0, true)
return statusError, err
}
if frs.DeletionTimestamp != nil {
if err := frsc.delete(frs); err != nil {
glog.Errorf("Failed to delete %s: %v", frs, err)
frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "DeleteFailed",
"ReplicaSet delete failed: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
return statusError, err
}
return statusAllOk, nil
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for replicaset: %s",
frs.Name)
// Add the required finalizers before creating a replicaset in underlying clusters.
updatedRsObj, err := frsc.deletionHelper.EnsureFinalizers(frs)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in replicaset %s: %v",
frs.Name, err)
frsc.deliverReplicaSetByKey(key, 0, false)
return statusError, err
}
frs = updatedRsObj.(*extensionsv1.ReplicaSet)
glog.V(3).Infof("Syncing replicaset %s in underlying clusters", frs.Name)
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
@ -535,3 +633,23 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
frsc.deliverReplicaSetByKey(key, 0, false)
}
}
// delete deletes the given replicaset or returns error if the deletion was not complete.
func (frsc *ReplicaSetController) delete(replicaset *extensionsv1.ReplicaSet) error {
glog.V(3).Infof("Handling deletion of replicaset: %v", *replicaset)
_, err := frsc.deletionHelper.HandleObjectInUnderlyingClusters(replicaset)
if err != nil {
return err
}
err = frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Delete(replicaset.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of replicaset finalizer deletion.
// The process that deleted the last finalizer is also going to delete the replicaset and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete replicaset: %v", err)
}
}
return nil
}