diff --git a/federation/pkg/federation-controller/service/BUILD b/federation/pkg/federation-controller/service/BUILD index b7b754e6b96..5e82faf1d20 100644 --- a/federation/pkg/federation-controller/service/BUILD +++ b/federation/pkg/federation-controller/service/BUILD @@ -26,6 +26,7 @@ go_library( "//federation/pkg/dnsprovider:go_default_library", "//federation/pkg/dnsprovider/rrstype:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//pkg/api:go_default_library", "//pkg/api/errors:go_default_library", "//pkg/api/v1:go_default_library", @@ -35,6 +36,7 @@ go_library( "//pkg/client/record:go_default_library", "//pkg/client/restclient:go_default_library", "//pkg/controller:go_default_library", + "//pkg/conversion:go_default_library", "//pkg/runtime:go_default_library", "//pkg/util/runtime:go_default_library", "//pkg/util/sets:go_default_library", diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 9929b1684f6..a02bd26505f 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -28,6 +28,9 @@ import ( federationcache "k8s.io/kubernetes/federation/client/cache" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/dnsprovider" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" v1 "k8s.io/kubernetes/pkg/api/v1" @@ -36,6 +39,7 @@ import ( kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/conversion" pkgruntime "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" @@ -68,6 +72,10 @@ const ( KubeAPIBurst = 30 maxNoOfClusters = 100 + + updateTimeout = 30 * time.Second + allClustersKey = "ALL_CLUSTERS" + clusterAvailableDelay = time.Second * 20 ) type cachedService struct { @@ -114,6 +122,7 @@ type ServiceController struct { serviceStore cache.StoreToServiceLister // Watches changes to all services serviceController *cache.Controller + federatedInformer fedutil.FederatedInformer // A store of services, populated by the serviceController clusterStore federationcache.StoreToClusterLister // Watches changes to all services @@ -133,6 +142,12 @@ type ServiceController struct { serviceWorkerMap map[string]bool // channel for worker to signal that it is going out of existence serviceWorkerDoneChan chan string + + // For triggering all services reconciliation. This is used when + // a new cluster becomes available. + clusterDeliverer *util.DelayingDeliverer + + deletionHelper *deletionhelper.DeletionHelper } // New returns a new service controller to keep DNS provider service resources @@ -162,6 +177,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, queue: workqueue.New(), knownClusterSet: make(sets.String), } + s.clusterDeliverer = util.NewDelayingDeliverer() s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { @@ -224,6 +240,66 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, }, ) + clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *v1beta1.Cluster) { + s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + }, + } + fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + return cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { + return targetClient.Core().Services(v1.NamespaceAll).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + return targetClient.Core().Services(v1.NamespaceAll).Watch(options) + }, + }, + &v1.Service{}, + controller.NoResyncPeriodFunc(), + // Trigger reconciliation whenever something in federated cluster is changed. In most cases it + // would be just confirmation that some service operation succeeded. + util.NewTriggerOnAllChanges( + func(obj pkgruntime.Object) { + // TODO: Use this to enque services. + }, + )) + } + + s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) + + federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer, + func(client kubeclientset.Interface, obj pkgruntime.Object) error { + svc := obj.(*v1.Service) + _, err := client.Core().Services(svc.Namespace).Create(svc) + return err + }, + func(client kubeclientset.Interface, obj pkgruntime.Object) error { + svc := obj.(*v1.Service) + _, err := client.Core().Services(svc.Namespace).Update(svc) + return err + }, + func(client kubeclientset.Interface, obj pkgruntime.Object) error { + svc := obj.(*v1.Service) + err := client.Core().Services(svc.Namespace).Delete(svc.Name, &v1.DeleteOptions{}) + return err + }) + + s.deletionHelper = deletionhelper.NewDeletionHelper( + s.hasFinalizerFunc, + s.removeFinalizerFunc, + s.addFinalizerFunc, + // objNameFunc + func(obj pkgruntime.Object) string { + service := obj.(*v1.Service) + return service.Name + }, + updateTimeout, + s.eventRecorder, + s.federatedInformer, + federatedUpdater, + ) + s.endpointWorkerMap = make(map[string]bool) s.serviceWorkerMap = make(map[string]bool) s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) @@ -231,6 +307,54 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, return s } +// Returns true if the given object has the given finalizer in its ObjectMeta. +func (s *ServiceController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { + service := obj.(*v1.Service) + for i := range service.ObjectMeta.Finalizers { + if string(service.ObjectMeta.Finalizers[i]) == finalizer { + return true + } + } + return false +} + +// Removes the finalizer from the given objects ObjectMeta. +// Assumes that the given object is a service. +func (s *ServiceController) removeFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) { + service := obj.(*v1.Service) + newFinalizers := []string{} + hasFinalizer := false + for i := range service.ObjectMeta.Finalizers { + if string(service.ObjectMeta.Finalizers[i]) != finalizer { + newFinalizers = append(newFinalizers, service.ObjectMeta.Finalizers[i]) + } else { + hasFinalizer = true + } + } + if !hasFinalizer { + // Nothing to do. + return obj, nil + } + service.ObjectMeta.Finalizers = newFinalizers + service, err := s.federationClient.Core().Services(service.Namespace).Update(service) + if err != nil { + return nil, fmt.Errorf("failed to remove finalizer %s from service %s: %v", finalizer, service.Name, err) + } + return service, nil +} + +// Adds the given finalizer to the given objects ObjectMeta. +// Assumes that the given object is a service. +func (s *ServiceController) addFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) { + service := obj.(*v1.Service) + service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, finalizer) + service, err := s.federationClient.Core().Services(service.Namespace).Update(service) + if err != nil { + return nil, fmt.Errorf("failed to add finalizer %s to service %s: %v", finalizer, service.Name, err) + } + return service, nil +} + // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. func (s *ServiceController) enqueueService(obj interface{}) { key, err := controller.KeyFunc(obj) @@ -257,6 +381,10 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { return err } defer runtime.HandleCrash() + s.federatedInformer.Start() + s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + // TODO: Use this new clusterDeliverer to reconcile services in new clusters. + }) go s.serviceController.Run(stopCh) go s.clusterController.Run(stopCh) for i := 0; i < workers; i++ { @@ -268,6 +396,7 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { <-stopCh glog.Infof("Shutting down Federation Service Controller") s.queue.ShutDown() + s.federatedInformer.Stop() return nil } @@ -361,20 +490,16 @@ func (s *ServiceController) processServiceForCluster(cachedService *cachedServic // should be retried. func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) { // Clone federation service, and create them in underlying k8s cluster - clone, err := api.Scheme.DeepCopy(cachedService.lastState) - if err != nil { - return err, !retryable - } - service, ok := clone.(*v1.Service) - if !ok { - return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable + desiredService := &v1.Service{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta), + Spec: *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)), } // handle available clusters one by one var hasErr bool for clusterName, cache := range s.clusterCache.clientMap { go func(cache *clusterCache, clusterName string) { - err = s.processServiceForCluster(cachedService, clusterName, service, cache.clientset) + err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset) if err != nil { hasErr = true } @@ -382,7 +507,7 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c } if hasErr { // detail error has been dumped inside the loop - return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", service.Namespace, service.Name), retryable + return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable } return nil, !retryable } @@ -812,18 +937,25 @@ func (s *ServiceController) syncService(key string) error { glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) }() // obj holds the latest service info from apiserver - obj, exists, err := s.serviceStore.Indexer.GetByKey(key) + objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key) if err != nil { glog.Errorf("Unable to retrieve service %v from store: %v", key, err) s.queue.Add(key) return err } - if !exists { // service absence in store means watcher caught the deletion, ensure LB info is cleaned glog.Infof("Service has been deleted %v", key) err, retryDelay = s.processServiceDeletion(key) } + // Create a copy before modifying the obj to prevent race condition with + // other readers of obj from store. + obj, err := conversion.NewCloner().DeepCopy(objFromStore) + if err != nil { + glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err) + s.queue.Add(key) + return err + } if exists { service, ok := obj.(*v1.Service) @@ -857,6 +989,29 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s cachedService.rwlock.Lock() defer cachedService.rwlock.Unlock() + if service.DeletionTimestamp != nil { + if err := s.delete(service); err != nil { + glog.Errorf("Failed to delete %s: %v", service, err) + s.eventRecorder.Eventf(service, api.EventTypeNormal, "DeleteFailed", + "Service delete failed: %v", err) + return err, cachedService.nextRetryDelay() + } + return nil, doNotRetry + } + + glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s", + service.Name) + // Add the required finalizers before creating a service in underlying clusters. + updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service) + if err != nil { + glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v", + service.Name, err) + return err, cachedService.nextRetryDelay() + } + service = updatedServiceObj.(*v1.Service) + + glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name) + // Update the cached service (used above for populating synthetic deletes) // alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver // if the same service is changed before this go routine finished, there will be another queue entry to handle that. @@ -885,6 +1040,26 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s return nil, doNotRetry } +// delete deletes the given service or returns error if the deletion was not complete. +func (s *ServiceController) delete(service *v1.Service) error { + glog.V(3).Infof("Handling deletion of service: %v", *service) + _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(service) + if err != nil { + return err + } + + err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil) + if err != nil { + // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. + // This is expected when we are processing an update as a result of service finalizer deletion. + // The process that deleted the last finalizer is also going to delete the service and we do not have to do anything. + if !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete service: %v", err) + } + } + return nil +} + // processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration // indicating whether processing should be retried; zero means no-retry; otherwise // we should retry in that Duration. diff --git a/test/e2e/federated-ingress.go b/test/e2e/federated-ingress.go index 4d0a432d062..0b7238707c1 100644 --- a/test/e2e/federated-ingress.go +++ b/test/e2e/federated-ingress.go @@ -154,7 +154,7 @@ var _ = framework.KubeDescribe("Federated ingresses [Feature:Federation]", func( AfterEach(func() { deleteBackendPodsOrFail(clusters, ns) if service != nil { - deleteServiceOrFail(f.FederationClientset_1_5, ns, service.Name) + deleteServiceOrFail(f.FederationClientset_1_5, ns, service.Name, nil) cleanupServiceShardsAndProviderResources(ns, service, clusters) service = nil } else { diff --git a/test/e2e/federated-service.go b/test/e2e/federated-service.go index 6ab87251179..ac90ed8355b 100644 --- a/test/e2e/federated-service.go +++ b/test/e2e/federated-service.go @@ -21,11 +21,15 @@ import ( "os" "reflect" "strconv" + "strings" "time" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" @@ -116,28 +120,29 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() { waitForServiceShardsOrFail(nsName, service, clusters) }) - It("should not be deleted from underlying clusters when it is deleted", func() { + It("should be deleted from underlying clusters when OrphanDependents is false", func() { framework.SkipUnlessFederated(f.ClientSet) - nsName = f.FederationNamespace.Name - service = createServiceOrFail(f.FederationClientset_1_5, nsName, FederatedServiceName) - By(fmt.Sprintf("Successfully created federated service %q in namespace %q. Waiting for shards to appear in underlying clusters", service.Name, nsName)) + nsName := f.FederationNamespace.Name + orphanDependents := false + verifyCascadingDeletionForService(f.FederationClientset_1_5, clusters, &orphanDependents, nsName) + By(fmt.Sprintf("Verified that services were deleted from underlying clusters")) + }) - waitForServiceShardsOrFail(nsName, service, clusters) + It("should not be deleted from underlying clusters when OrphanDependents is true", func() { + framework.SkipUnlessFederated(f.ClientSet) + nsName := f.FederationNamespace.Name + orphanDependents := true + verifyCascadingDeletionForService(f.FederationClientset_1_5, clusters, &orphanDependents, nsName) + By(fmt.Sprintf("Verified that services were not deleted from underlying clusters")) + }) - By(fmt.Sprintf("Deleting service %s", service.Name)) - err := f.FederationClientset_1_5.Services(nsName).Delete(service.Name, &v1.DeleteOptions{}) - framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, service.Namespace) - By(fmt.Sprintf("Deletion of service %q in namespace %q succeeded.", service.Name, nsName)) - By(fmt.Sprintf("Verifying that services in underlying clusters are not deleted")) - for clusterName, clusterClientset := range clusters { - _, err := clusterClientset.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{}) - if err != nil { - framework.Failf("Unexpected error in fetching service %s in cluster %s, %s", service.Name, clusterName, err) - } - } + It("should not be deleted from underlying clusters when OrphanDependents is nil", func() { + framework.SkipUnlessFederated(f.ClientSet) + nsName := f.FederationNamespace.Name + verifyCascadingDeletionForService(f.FederationClientset_1_5, clusters, nil, nsName) + By(fmt.Sprintf("Verified that services were not deleted from underlying clusters")) }) }) - var _ = Describe("DNS", func() { var ( @@ -211,7 +216,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() { deleteBackendPodsOrFail(clusters, nsName) if service != nil { - deleteServiceOrFail(f.FederationClientset_1_5, nsName, service.Name) + deleteServiceOrFail(f.FederationClientset_1_5, nsName, service.Name, nil) service = nil } else { By("No service to delete. Service is nil") @@ -308,6 +313,48 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() { }) }) +// verifyCascadingDeletionForService verifies that services are deleted from +// underlying clusters when orphan dependents is false and they are not +// deleted when orphan dependents is true. +func verifyCascadingDeletionForService(clientset *fedclientset.Clientset, clusters map[string]*cluster, orphanDependents *bool, nsName string) { + service := createServiceOrFail(clientset, nsName, FederatedServiceName) + serviceName := service.Name + // Check subclusters if the service was created there. + By(fmt.Sprintf("Waiting for service %s to be created in all underlying clusters", serviceName)) + err := wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) { + for _, cluster := range clusters { + _, err := cluster.Core().Services(nsName).Get(serviceName, metav1.GetOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + return false, err + } + return false, nil + } + } + return true, nil + }) + framework.ExpectNoError(err, "Not all services created") + + By(fmt.Sprintf("Deleting service %s", serviceName)) + deleteServiceOrFail(clientset, nsName, serviceName, orphanDependents) + + By(fmt.Sprintf("Verifying services %s in underlying clusters", serviceName)) + errMessages := []string{} + // service should be present in underlying clusters unless orphanDependents is false. + shouldExist := orphanDependents == nil || *orphanDependents == true + for clusterName, clusterClientset := range clusters { + _, err := clusterClientset.Core().Services(nsName).Get(serviceName, metav1.GetOptions{}) + if shouldExist && errors.IsNotFound(err) { + errMessages = append(errMessages, fmt.Sprintf("unexpected NotFound error for service %s in cluster %s, expected service to exist", serviceName, clusterName)) + } else if !shouldExist && !errors.IsNotFound(err) { + errMessages = append(errMessages, fmt.Sprintf("expected NotFound error for service %s in cluster %s, got error: %v", serviceName, clusterName, err)) + } + } + if len(errMessages) != 0 { + framework.Failf("%s", strings.Join(errMessages, "; ")) + } +} + /* equivalent returns true if the two services are equivalent. Fields which are expected to differ between federated services and the underlying cluster services (e.g. ClusterIP, LoadBalancerIP etc) are ignored. diff --git a/test/e2e/federation-authn.go b/test/e2e/federation-authn.go index 9ee73c02ee4..ef228c339fb 100644 --- a/test/e2e/federation-authn.go +++ b/test/e2e/federation-authn.go @@ -42,7 +42,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() { nsName := f.FederationNamespace.Name svc := createServiceOrFail(f.FederationClientset_1_5, nsName, FederatedServiceName) - deleteServiceOrFail(f.FederationClientset_1_5, nsName, svc.Name) + deleteServiceOrFail(f.FederationClientset_1_5, nsName, svc.Name, nil) }) It("should not accept cluster resources when the client has invalid authentication credentials", func() { @@ -62,7 +62,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() { svc, err := createService(fcs, nsName, FederatedServiceName) Expect(errors.IsUnauthorized(err)).To(BeTrue()) if err == nil && svc != nil { - deleteServiceOrFail(fcs, nsName, svc.Name) + deleteServiceOrFail(fcs, nsName, svc.Name, nil) } }) @@ -76,7 +76,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() { svc, err := createService(fcs, nsName, FederatedServiceName) Expect(errors.IsUnauthorized(err)).To(BeTrue()) if err == nil && svc != nil { - deleteServiceOrFail(fcs, nsName, svc.Name) + deleteServiceOrFail(fcs, nsName, svc.Name, nil) } }) }) diff --git a/test/e2e/federation-util.go b/test/e2e/federation-util.go index 005aae38e43..266e029604a 100644 --- a/test/e2e/federation-util.go +++ b/test/e2e/federation-util.go @@ -283,12 +283,23 @@ func createServiceOrFail(clientset *fedclientset.Clientset, namespace, name stri return service } -func deleteServiceOrFail(clientset *fedclientset.Clientset, namespace string, serviceName string) { +func deleteServiceOrFail(clientset *fedclientset.Clientset, namespace string, serviceName string, orphanDependents *bool) { if clientset == nil || len(namespace) == 0 || len(serviceName) == 0 { Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteServiceOrFail: clientset: %v, namespace: %v, service: %v", clientset, namespace, serviceName)) } - err := clientset.Services(namespace).Delete(serviceName, v1.NewDeleteOptions(0)) + err := clientset.Services(namespace).Delete(serviceName, &v1.DeleteOptions{OrphanDependents: orphanDependents}) framework.ExpectNoError(err, "Error deleting service %q from namespace %q", serviceName, namespace) + // Wait for the service to be deleted. + err = wait.Poll(5*time.Second, 3*wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientset.Core().Services(namespace).Get(serviceName, metav1.GetOptions{}) + if err != nil && errors.IsNotFound(err) { + return true, nil + } + return false, err + }) + if err != nil { + framework.Failf("Error in deleting service %s: %v", serviceName, err) + } } func cleanupServiceShardsAndProviderResources(namespace string, service *v1.Service, clusters map[string]*cluster) {