Merge pull request #36390 from nikhiljindal/cascDelSvc

Automatic merge from submit-queue (batch tested with PRs 38212, 38792, 39641, 36390, 39005)

Updating federated service controller to support cascading deletion

Ref https://github.com/kubernetes/kubernetes/issues/33612

Service controller is special than other federation controllers because it does not use federatedinformer and updater to sync services (it was written before we had those frameworks).
Updating service controller code to instantiate these frameworks and then use deletion helper to perform cascading deletion.
Note that, I havent changed the queuing logic in this PR so we still dont use federated informer to manage the queue. Will do that in the next PR.

cc @kubernetes/sig-federation-misc  @mwielgus @quinton-hoole


```release-note
federation: Adding support for DeleteOptions.OrphanDependents for federated services. Setting it to false while deleting a federated service also deletes the corresponding services from all registered clusters.
```
This commit is contained in:
Kubernetes Submit Queue 2017-01-10 19:48:14 -08:00 committed by GitHub
commit 94cca27385
6 changed files with 270 additions and 35 deletions

View File

@ -26,6 +26,7 @@ go_library(
"//federation/pkg/dnsprovider:go_default_library",
"//federation/pkg/dnsprovider/rrstype:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
@ -35,6 +36,7 @@ go_library(
"//pkg/client/record:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",

View File

@ -28,6 +28,9 @@ import (
federationcache "k8s.io/kubernetes/federation/client/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
v1 "k8s.io/kubernetes/pkg/api/v1"
@ -36,6 +39,7 @@ import (
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/conversion"
pkgruntime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
@ -68,6 +72,10 @@ const (
KubeAPIBurst = 30
maxNoOfClusters = 100
updateTimeout = 30 * time.Second
allClustersKey = "ALL_CLUSTERS"
clusterAvailableDelay = time.Second * 20
)
type cachedService struct {
@ -114,6 +122,7 @@ type ServiceController struct {
serviceStore cache.StoreToServiceLister
// Watches changes to all services
serviceController *cache.Controller
federatedInformer fedutil.FederatedInformer
// A store of services, populated by the serviceController
clusterStore federationcache.StoreToClusterLister
// Watches changes to all services
@ -133,6 +142,12 @@ type ServiceController struct {
serviceWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
serviceWorkerDoneChan chan string
// For triggering all services reconciliation. This is used when
// a new cluster becomes available.
clusterDeliverer *util.DelayingDeliverer
deletionHelper *deletionhelper.DeletionHelper
}
// New returns a new service controller to keep DNS provider service resources
@ -162,6 +177,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
queue: workqueue.New(),
knownClusterSet: make(sets.String),
}
s.clusterDeliverer = util.NewDelayingDeliverer()
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {
@ -224,6 +240,66 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
},
)
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *v1beta1.Cluster) {
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
},
}
fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {
return targetClient.Core().Services(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return targetClient.Core().Services(v1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
controller.NoResyncPeriodFunc(),
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some service operation succeeded.
util.NewTriggerOnAllChanges(
func(obj pkgruntime.Object) {
// TODO: Use this to enque services.
},
))
}
s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service)
_, err := client.Core().Services(svc.Namespace).Create(svc)
return err
},
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service)
_, err := client.Core().Services(svc.Namespace).Update(svc)
return err
},
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service)
err := client.Core().Services(svc.Namespace).Delete(svc.Name, &v1.DeleteOptions{})
return err
})
s.deletionHelper = deletionhelper.NewDeletionHelper(
s.hasFinalizerFunc,
s.removeFinalizerFunc,
s.addFinalizerFunc,
// objNameFunc
func(obj pkgruntime.Object) string {
service := obj.(*v1.Service)
return service.Name
},
updateTimeout,
s.eventRecorder,
s.federatedInformer,
federatedUpdater,
)
s.endpointWorkerMap = make(map[string]bool)
s.serviceWorkerMap = make(map[string]bool)
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
@ -231,6 +307,54 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
return s
}
// Returns true if the given object has the given finalizer in its ObjectMeta.
func (s *ServiceController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool {
service := obj.(*v1.Service)
for i := range service.ObjectMeta.Finalizers {
if string(service.ObjectMeta.Finalizers[i]) == finalizer {
return true
}
}
return false
}
// Removes the finalizer from the given objects ObjectMeta.
// Assumes that the given object is a service.
func (s *ServiceController) removeFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) {
service := obj.(*v1.Service)
newFinalizers := []string{}
hasFinalizer := false
for i := range service.ObjectMeta.Finalizers {
if string(service.ObjectMeta.Finalizers[i]) != finalizer {
newFinalizers = append(newFinalizers, service.ObjectMeta.Finalizers[i])
} else {
hasFinalizer = true
}
}
if !hasFinalizer {
// Nothing to do.
return obj, nil
}
service.ObjectMeta.Finalizers = newFinalizers
service, err := s.federationClient.Core().Services(service.Namespace).Update(service)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from service %s: %v", finalizer, service.Name, err)
}
return service, nil
}
// Adds the given finalizer to the given objects ObjectMeta.
// Assumes that the given object is a service.
func (s *ServiceController) addFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) {
service := obj.(*v1.Service)
service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, finalizer)
service, err := s.federationClient.Core().Services(service.Namespace).Update(service)
if err != nil {
return nil, fmt.Errorf("failed to add finalizer %s to service %s: %v", finalizer, service.Name, err)
}
return service, nil
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (s *ServiceController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
@ -257,6 +381,10 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
return err
}
defer runtime.HandleCrash()
s.federatedInformer.Start()
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
// TODO: Use this new clusterDeliverer to reconcile services in new clusters.
})
go s.serviceController.Run(stopCh)
go s.clusterController.Run(stopCh)
for i := 0; i < workers; i++ {
@ -268,6 +396,7 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
<-stopCh
glog.Infof("Shutting down Federation Service Controller")
s.queue.ShutDown()
s.federatedInformer.Stop()
return nil
}
@ -361,20 +490,16 @@ func (s *ServiceController) processServiceForCluster(cachedService *cachedServic
// should be retried.
func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) {
// Clone federation service, and create them in underlying k8s cluster
clone, err := api.Scheme.DeepCopy(cachedService.lastState)
if err != nil {
return err, !retryable
}
service, ok := clone.(*v1.Service)
if !ok {
return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable
desiredService := &v1.Service{
ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta),
Spec: *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)),
}
// handle available clusters one by one
var hasErr bool
for clusterName, cache := range s.clusterCache.clientMap {
go func(cache *clusterCache, clusterName string) {
err = s.processServiceForCluster(cachedService, clusterName, service, cache.clientset)
err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
if err != nil {
hasErr = true
}
@ -382,7 +507,7 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c
}
if hasErr {
// detail error has been dumped inside the loop
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", service.Namespace, service.Name), retryable
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable
}
return nil, !retryable
}
@ -812,18 +937,25 @@ func (s *ServiceController) syncService(key string) error {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)
return err
}
if !exists {
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
}
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
if err != nil {
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
s.queue.Add(key)
return err
}
if exists {
service, ok := obj.(*v1.Service)
@ -857,6 +989,29 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
if service.DeletionTimestamp != nil {
if err := s.delete(service); err != nil {
glog.Errorf("Failed to delete %s: %v", service, err)
s.eventRecorder.Eventf(service, api.EventTypeNormal, "DeleteFailed",
"Service delete failed: %v", err)
return err, cachedService.nextRetryDelay()
}
return nil, doNotRetry
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s",
service.Name)
// Add the required finalizers before creating a service in underlying clusters.
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v",
service.Name, err)
return err, cachedService.nextRetryDelay()
}
service = updatedServiceObj.(*v1.Service)
glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name)
// Update the cached service (used above for populating synthetic deletes)
// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver
// if the same service is changed before this go routine finished, there will be another queue entry to handle that.
@ -885,6 +1040,26 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
return nil, doNotRetry
}
// delete deletes the given service or returns error if the deletion was not complete.
func (s *ServiceController) delete(service *v1.Service) error {
glog.V(3).Infof("Handling deletion of service: %v", *service)
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(service)
if err != nil {
return err
}
err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of service finalizer deletion.
// The process that deleted the last finalizer is also going to delete the service and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete service: %v", err)
}
}
return nil
}
// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.

View File

@ -154,7 +154,7 @@ var _ = framework.KubeDescribe("Federated ingresses [Feature:Federation]", func(
AfterEach(func() {
deleteBackendPodsOrFail(clusters, ns)
if service != nil {
deleteServiceOrFail(f.FederationClientset_1_5, ns, service.Name)
deleteServiceOrFail(f.FederationClientset_1_5, ns, service.Name, nil)
cleanupServiceShardsAndProviderResources(ns, service, clusters)
service = nil
} else {

View File

@ -21,11 +21,15 @@ import (
"os"
"reflect"
"strconv"
"strings"
"time"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
@ -116,28 +120,29 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
waitForServiceShardsOrFail(nsName, service, clusters)
})
It("should not be deleted from underlying clusters when it is deleted", func() {
It("should be deleted from underlying clusters when OrphanDependents is false", func() {
framework.SkipUnlessFederated(f.ClientSet)
nsName = f.FederationNamespace.Name
service = createServiceOrFail(f.FederationClientset_1_5, nsName, FederatedServiceName)
By(fmt.Sprintf("Successfully created federated service %q in namespace %q. Waiting for shards to appear in underlying clusters", service.Name, nsName))
waitForServiceShardsOrFail(nsName, service, clusters)
By(fmt.Sprintf("Deleting service %s", service.Name))
err := f.FederationClientset_1_5.Services(nsName).Delete(service.Name, &v1.DeleteOptions{})
framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, service.Namespace)
By(fmt.Sprintf("Deletion of service %q in namespace %q succeeded.", service.Name, nsName))
By(fmt.Sprintf("Verifying that services in underlying clusters are not deleted"))
for clusterName, clusterClientset := range clusters {
_, err := clusterClientset.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
if err != nil {
framework.Failf("Unexpected error in fetching service %s in cluster %s, %s", service.Name, clusterName, err)
}
}
})
nsName := f.FederationNamespace.Name
orphanDependents := false
verifyCascadingDeletionForService(f.FederationClientset_1_5, clusters, &orphanDependents, nsName)
By(fmt.Sprintf("Verified that services were deleted from underlying clusters"))
})
It("should not be deleted from underlying clusters when OrphanDependents is true", func() {
framework.SkipUnlessFederated(f.ClientSet)
nsName := f.FederationNamespace.Name
orphanDependents := true
verifyCascadingDeletionForService(f.FederationClientset_1_5, clusters, &orphanDependents, nsName)
By(fmt.Sprintf("Verified that services were not deleted from underlying clusters"))
})
It("should not be deleted from underlying clusters when OrphanDependents is nil", func() {
framework.SkipUnlessFederated(f.ClientSet)
nsName := f.FederationNamespace.Name
verifyCascadingDeletionForService(f.FederationClientset_1_5, clusters, nil, nsName)
By(fmt.Sprintf("Verified that services were not deleted from underlying clusters"))
})
})
var _ = Describe("DNS", func() {
var (
@ -211,7 +216,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
deleteBackendPodsOrFail(clusters, nsName)
if service != nil {
deleteServiceOrFail(f.FederationClientset_1_5, nsName, service.Name)
deleteServiceOrFail(f.FederationClientset_1_5, nsName, service.Name, nil)
service = nil
} else {
By("No service to delete. Service is nil")
@ -308,6 +313,48 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
})
})
// verifyCascadingDeletionForService verifies that services are deleted from
// underlying clusters when orphan dependents is false and they are not
// deleted when orphan dependents is true.
func verifyCascadingDeletionForService(clientset *fedclientset.Clientset, clusters map[string]*cluster, orphanDependents *bool, nsName string) {
service := createServiceOrFail(clientset, nsName, FederatedServiceName)
serviceName := service.Name
// Check subclusters if the service was created there.
By(fmt.Sprintf("Waiting for service %s to be created in all underlying clusters", serviceName))
err := wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) {
for _, cluster := range clusters {
_, err := cluster.Core().Services(nsName).Get(serviceName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return false, err
}
return false, nil
}
}
return true, nil
})
framework.ExpectNoError(err, "Not all services created")
By(fmt.Sprintf("Deleting service %s", serviceName))
deleteServiceOrFail(clientset, nsName, serviceName, orphanDependents)
By(fmt.Sprintf("Verifying services %s in underlying clusters", serviceName))
errMessages := []string{}
// service should be present in underlying clusters unless orphanDependents is false.
shouldExist := orphanDependents == nil || *orphanDependents == true
for clusterName, clusterClientset := range clusters {
_, err := clusterClientset.Core().Services(nsName).Get(serviceName, metav1.GetOptions{})
if shouldExist && errors.IsNotFound(err) {
errMessages = append(errMessages, fmt.Sprintf("unexpected NotFound error for service %s in cluster %s, expected service to exist", serviceName, clusterName))
} else if !shouldExist && !errors.IsNotFound(err) {
errMessages = append(errMessages, fmt.Sprintf("expected NotFound error for service %s in cluster %s, got error: %v", serviceName, clusterName, err))
}
}
if len(errMessages) != 0 {
framework.Failf("%s", strings.Join(errMessages, "; "))
}
}
/*
equivalent returns true if the two services are equivalent. Fields which are expected to differ between
federated services and the underlying cluster services (e.g. ClusterIP, LoadBalancerIP etc) are ignored.

View File

@ -42,7 +42,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
nsName := f.FederationNamespace.Name
svc := createServiceOrFail(f.FederationClientset_1_5, nsName, FederatedServiceName)
deleteServiceOrFail(f.FederationClientset_1_5, nsName, svc.Name)
deleteServiceOrFail(f.FederationClientset_1_5, nsName, svc.Name, nil)
})
It("should not accept cluster resources when the client has invalid authentication credentials", func() {
@ -62,7 +62,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
svc, err := createService(fcs, nsName, FederatedServiceName)
Expect(errors.IsUnauthorized(err)).To(BeTrue())
if err == nil && svc != nil {
deleteServiceOrFail(fcs, nsName, svc.Name)
deleteServiceOrFail(fcs, nsName, svc.Name, nil)
}
})
@ -76,7 +76,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
svc, err := createService(fcs, nsName, FederatedServiceName)
Expect(errors.IsUnauthorized(err)).To(BeTrue())
if err == nil && svc != nil {
deleteServiceOrFail(fcs, nsName, svc.Name)
deleteServiceOrFail(fcs, nsName, svc.Name, nil)
}
})
})

View File

@ -283,12 +283,23 @@ func createServiceOrFail(clientset *fedclientset.Clientset, namespace, name stri
return service
}
func deleteServiceOrFail(clientset *fedclientset.Clientset, namespace string, serviceName string) {
func deleteServiceOrFail(clientset *fedclientset.Clientset, namespace string, serviceName string, orphanDependents *bool) {
if clientset == nil || len(namespace) == 0 || len(serviceName) == 0 {
Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteServiceOrFail: clientset: %v, namespace: %v, service: %v", clientset, namespace, serviceName))
}
err := clientset.Services(namespace).Delete(serviceName, v1.NewDeleteOptions(0))
err := clientset.Services(namespace).Delete(serviceName, &v1.DeleteOptions{OrphanDependents: orphanDependents})
framework.ExpectNoError(err, "Error deleting service %q from namespace %q", serviceName, namespace)
// Wait for the service to be deleted.
err = wait.Poll(5*time.Second, 3*wait.ForeverTestTimeout, func() (bool, error) {
_, err := clientset.Core().Services(namespace).Get(serviceName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
return true, nil
}
return false, err
})
if err != nil {
framework.Failf("Error in deleting service %s: %v", serviceName, err)
}
}
func cleanupServiceShardsAndProviderResources(namespace string, service *v1.Service, clusters map[string]*cluster) {