diff --git a/federation/pkg/federation-controller/service/dns.go b/federation/pkg/federation-controller/service/dns.go index 1bc53f25000..789a45f162c 100644 --- a/federation/pkg/federation-controller/service/dns.go +++ b/federation/pkg/federation-controller/service/dns.go @@ -24,8 +24,10 @@ import ( "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype" + "k8s.io/kubernetes/pkg/api/v1" ) const ( @@ -34,7 +36,7 @@ const ( ) // getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error) -func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedService *cachedService) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) { +func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) { var ( zoneNames []string regionName string @@ -42,16 +44,24 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic if zoneNames, regionName, err = s.getClusterZoneNames(clusterName); err != nil { return nil, nil, nil, err } - for lbClusterName, lbStatus := range cachedService.serviceStatusMap { + + // If federated service is deleted, return empty endpoints, so that DNS records are removed + if service.DeletionTimestamp != nil { + return zoneEndpoints, regionEndpoints, globalEndpoints, nil + } + + serviceIngress, err := ParseFederatedServiceIngress(service) + if err != nil { + return nil, nil, nil, err + } + + for _, lbClusterIngress := range serviceIngress.Items { + lbClusterName := lbClusterIngress.Cluster lbZoneNames, lbRegionName, err := s.getClusterZoneNames(lbClusterName) if err != nil { return nil, nil, nil, err } - for _, ingress := range lbStatus.Ingress { - readyEndpoints, ok := cachedService.endpointMap[lbClusterName] - if !ok || readyEndpoints == 0 { - continue - } + for _, ingress := range lbClusterIngress.Items { var address string // We should get either an IP address or a hostname - use whichever one we get if ingress.IP != "" { @@ -61,7 +71,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic } if len(address) <= 0 { return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.", - cachedService.lastState.Name, cachedService.lastState.Namespace, clusterName) + service.Name, service.Namespace, clusterName) } for _, lbZoneName := range lbZoneNames { for _, zoneName := range zoneNames { @@ -80,15 +90,12 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic } // getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS) -func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []string, region string, err error) { - client, ok := s.clusterCache.clientMap[clusterName] - if !ok { - return nil, "", fmt.Errorf("Cluster cache does not contain entry for cluster %s", clusterName) +func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) { + cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{}) + if err != nil { + return nil, "", err } - if client.cluster == nil { - return nil, "", fmt.Errorf("Cluster cache entry for cluster %s is nil", clusterName) - } - return client.cluster.Status.Zones, client.cluster.Status.Region, nil + return cluster.Status.Zones, cluster.Status.Region, nil } // getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records @@ -284,7 +291,7 @@ given the current state of that service in that cluster. This should be called (or vice versa). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */ -func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *cachedService) error { +func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error { // Quinton: Pseudocode.... // See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648 // For each service we need the following DNS names: @@ -298,21 +305,21 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * // - a set of A records to IP addresses of all healthy shards in all regions, if one or more of these exist. // - no record (NXRECORD response) if no healthy shards exist in any regions // - // For each cached service, cachedService.lastState tracks the current known state of the service, while cachedService.appliedState contains - // the state of the service when we last successfully synced its DNS records. - // So this time around we only need to patch that (add new records, remove deleted records, and update changed records). + // Each service has the current known state of loadbalancer ingress for the federated cluster stored in annotations. + // So generate the DNS records based on the current state and ensure those desired DNS records match the + // actual DNS records (add new records, remove deleted records, and update changed records). // if s == nil { - return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService) + return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service) } if s.dns == nil { return nil } - if cachedService == nil { - return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService) + if service == nil { + return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service) } - serviceName := cachedService.lastState.Name - namespaceName := cachedService.lastState.Namespace + serviceName := service.Name + namespaceName := service.Namespace zoneNames, regionName, err := s.getClusterZoneNames(clusterName) if err != nil { return err @@ -324,7 +331,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * if err != nil { return err } - zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, cachedService) + zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service) if err != nil { return err } diff --git a/federation/pkg/federation-controller/service/dns_test.go b/federation/pkg/federation-controller/service/dns_test.go index 6836449e7e1..2153cfff040 100644 --- a/federation/pkg/federation-controller/service/dns_test.go +++ b/federation/pkg/federation-controller/service/dns_test.go @@ -27,11 +27,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" ) func TestServiceController_ensureDnsRecords(t *testing.T) { + clusterName := "testcluster" + tests := []struct { name string service v1.Service @@ -44,6 +48,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "servicename", Namespace: "servicenamespace", + Annotations: map[string]string{ + FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + AddEndpoints(clusterName, []string{"198.51.100.1"}). + String()}, }, }, serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}), @@ -92,7 +100,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { if !ok { t.Error("Unable to fetch zones") } + fakeClient := &fakefedclientset.Clientset{} + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) serviceController := ServiceController{ + federationClient: fakeClient, dns: fakedns, dnsZones: fakednsZones, serviceDnsSuffix: "federation.example.com", @@ -106,8 +117,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { knownClusterSet: make(sets.String), } - clusterName := "testcluster" - serviceController.clusterCache.clientMap[clusterName] = &clusterCache{ cluster: &v1beta1.Cluster{ Status: v1beta1.ClusterStatus{ @@ -127,7 +136,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { cachedService.serviceStatusMap[clusterName] = test.serviceStatus } - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, &test.service) if err != nil { t.Errorf("Test failed for %s, unexpected error %v", test.name, err) } diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 53295c4e523..b95d50082e2 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -122,7 +122,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) delete(cachedService.endpointMap, clusterName) for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) if err == nil { return nil } @@ -154,7 +154,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName) cachedService.endpointMap[clusterName] = 1 for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) if err == nil { return nil } @@ -175,7 +175,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName) delete(cachedService.endpointMap, clusterName) for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) if err == nil { return nil } diff --git a/federation/pkg/federation-controller/service/endpoint_helper_test.go b/federation/pkg/federation-controller/service/endpoint_helper_test.go index eb833116f92..1de1b81fbfa 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper_test.go +++ b/federation/pkg/federation-controller/service/endpoint_helper_test.go @@ -21,14 +21,18 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" v1 "k8s.io/kubernetes/pkg/api/v1" ) var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required. var fakeDnsZones, _ = fakeDns.Zones() +var fakeClient = &fakefedclientset.Clientset{} var fakeServiceController = ServiceController{ + federationClient: fakeClient, dns: fakeDns, dnsZones: fakeDnsZones, federationName: "fed1", @@ -68,6 +72,7 @@ func TestProcessEndpointUpdate(t *testing.T) { }, }, } + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) tests := []struct { name string cachedService *cachedService @@ -121,6 +126,7 @@ func TestProcessEndpointDeletion(t *testing.T) { }, }, } + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) tests := []struct { name string cachedService *cachedService diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 93672290b6c..7efcfff8774 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -108,7 +108,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache if needUpdate { for i := 0; i < clientRetryCount; i++ { - err := sc.ensureDnsRecords(clusterName, cachedService) + err := sc.ensureDnsRecords(clusterName, service) if err == nil { break } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index ac7c46c95c6..8617b5f3192 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -18,6 +18,8 @@ package service import ( "fmt" + "sort" + "strings" "sync" "time" @@ -27,8 +29,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/labels" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -36,7 +40,9 @@ import ( clientv1 "k8s.io/client-go/pkg/api/v1" cache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" + fedapi "k8s.io/kubernetes/federation/apis/federation" v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationcache "k8s.io/kubernetes/federation/client/cache" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" @@ -76,6 +82,7 @@ const ( maxNoOfClusters = 100 + reviewDelay = 10 * time.Second updateTimeout = 30 * time.Second allClustersKey = "ALL_CLUSTERS" clusterAvailableDelay = time.Second * 20 @@ -156,6 +163,15 @@ type ServiceController struct { clusterDeliverer *util.DelayingDeliverer deletionHelper *deletionhelper.DeletionHelper + + reviewDelay time.Duration + clusterAvailableDelay time.Duration + updateTimeout time.Duration + + endpointFederatedInformer fedutil.FederatedInformer + federatedUpdater fedutil.FederatedUpdater + objectDeliverer *util.DelayingDeliverer + flowcontrolBackoff *flowcontrol.Backoff } // New returns a new service controller to keep DNS provider service resources @@ -180,11 +196,16 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, rwlock: sync.Mutex{}, clientMap: make(map[string]*clusterCache), }, - eventBroadcaster: broadcaster, - eventRecorder: recorder, - queue: workqueue.New(), - knownClusterSet: make(sets.String), + eventBroadcaster: broadcaster, + eventRecorder: recorder, + queue: workqueue.New(), + knownClusterSet: make(sets.String), + reviewDelay: reviewDelay, + clusterAvailableDelay: clusterAvailableDelay, + updateTimeout: updateTimeout, + flowcontrolBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), } + s.objectDeliverer = util.NewDelayingDeliverer() s.clusterDeliverer = util.NewDelayingDeliverer() var serviceIndexer cache.Indexer serviceIndexer, s.serviceController = cache.NewIndexerInformer( @@ -198,57 +219,13 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, }, &v1.Service{}, serviceSyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: s.enqueueService, - UpdateFunc: func(old, cur interface{}) { - // there is case that old and new are equals but we still catch the event now. - if !reflect.DeepEqual(old, cur) { - s.enqueueService(cur) - } - }, - DeleteFunc: s.enqueueService, - }, + util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { + glog.V(5).Infof("Delivering notification from federation: %v", obj) + s.deliverObject(obj, 0, false) + }), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) s.serviceStore = corelisters.NewServiceLister(serviceIndexer) - s.clusterStore.Store, s.clusterController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return s.federationClient.Federation().Clusters().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return s.federationClient.Federation().Clusters().Watch(options) - }, - }, - &v1beta1.Cluster{}, - clusterSyncPeriod, - cache.ResourceEventHandlerFuncs{ - DeleteFunc: s.clusterCache.delFromClusterSet, - AddFunc: s.clusterCache.addToClientMap, - UpdateFunc: func(old, cur interface{}) { - oldCluster, ok := old.(*v1beta1.Cluster) - if !ok { - return - } - curCluster, ok := cur.(*v1beta1.Cluster) - if !ok { - return - } - if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { - // update when spec is changed - s.clusterCache.addToClientMap(cur) - } - - pred := getClusterConditionPredicate() - // only update when condition changed to ready from not-ready - if !pred(*oldCluster) && pred(*curCluster) { - s.clusterCache.addToClientMap(cur) - } - // did not handle ready -> not-ready - // how could we stop a controller? - }, - }, - ) clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *v1beta1.Cluster) { @@ -271,14 +248,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, // would be just confirmation that some service operation succeeded. util.NewTriggerOnAllChanges( func(obj pkgruntime.Object) { - // TODO: Use this to enque services. + glog.V(5).Infof("Delivering service notification from federated cluster %s: %v", cluster.Name, obj) + s.deliverObject(obj, s.reviewDelay, false) }, )) } s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) - federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer, + s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, func(client kubeclientset.Interface, obj pkgruntime.Object) error { svc := obj.(*v1.Service) _, err := client.Core().Services(svc.Namespace).Create(svc) @@ -293,9 +271,41 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, svc := obj.(*v1.Service) orphanDependents := false err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) + // IsNotFound error is fine since that means the object is deleted already. + if errors.IsNotFound(err) { + return nil + } return err }) + // Federated informers on endpoints in federated clusters. + // This will enable to check if service ingress endpoints in federated clusters are reachable + s.endpointFederatedInformer = fedutil.NewFederatedInformer( + federationClient, + func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) ( + cache.Store, cache.Controller) { + return cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { + return targetClient.Core().Endpoints(metav1.NamespaceAll).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return targetClient.Core().Endpoints(metav1.NamespaceAll).Watch(options) + }, + }, + &v1.Endpoints{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndFieldChanges( + "Subsets", + func(obj pkgruntime.Object) { + glog.V(5).Infof("Delivering endpoint notification from federated cluster %s :%v", cluster.Name, obj) + s.deliverObject(obj, s.reviewDelay, false) + }, + )) + }, + &fedutil.ClusterLifecycleHandlerFuncs{}, + ) + s.deletionHelper = deletionhelper.NewDeletionHelper( s.hasFinalizerFunc, s.removeFinalizerFunc, @@ -308,7 +318,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, updateTimeout, s.eventRecorder, s.federatedInformer, - federatedUpdater, + s.federatedUpdater, ) s.endpointWorkerMap = make(map[string]bool) @@ -393,22 +403,27 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { } defer runtime.HandleCrash() s.federatedInformer.Start() - s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - // TODO: Use this new clusterDeliverer to reconcile services in new clusters. + s.endpointFederatedInformer.Start() + s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + s.queue.Add(item.Value.(string)) }) + s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + s.deliverServicesOnClusterChange() + }) + fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh) go s.serviceController.Run(stopCh) - go s.clusterController.Run(stopCh) + for i := 0; i < workers; i++ { go wait.Until(s.fedServiceWorker, time.Second, stopCh) } - go wait.Until(s.clusterEndpointWorker, time.Second, stopCh) - go wait.Until(s.clusterServiceWorker, time.Second, stopCh) - go wait.Until(s.clusterSyncLoop, time.Second, stopCh) go func() { <-stopCh glog.Infof("Shutting down Federation Service Controller") s.queue.ShutDown() s.federatedInformer.Stop() + s.endpointFederatedInformer.Stop() + s.objectDeliverer.Stop() + s.clusterDeliverer.Stop() }() return nil } @@ -461,8 +476,15 @@ func (s *ServiceController) init() error { return nil } +type reconciliationStatus string + +const ( + statusAllOk = reconciliationStatus("ALL_OK") + statusError = reconciliationStatus("ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") +) + // fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncService is never invoked concurrently with the same key. func (s *ServiceController) fedServiceWorker() { for { func() { @@ -470,11 +492,21 @@ func (s *ServiceController) fedServiceWorker() { if quit { return } - defer s.queue.Done(key) - err := s.syncService(key.(string)) - if err != nil { - glog.Errorf("Error syncing service: %v", err) + service := key.(string) + status, err := s.reconcileService(service) + switch status { + case statusAllOk: + break + case statusError: + runtime.HandleError(fmt.Errorf("Error reconciling service %q: %v, delivering again", service, err)) + s.deliverService(service, 0, true) + case statusNotSynced: + glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service) + s.deliverService(service, s.clusterAvailableDelay, false) + default: + runtime.HandleError(fmt.Errorf("Unhandled reconciliation status for %q: %s, delivering again", service, status)) + s.deliverService(service, s.reviewDelay, false) } }() } @@ -871,7 +903,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust for key := range s.clusterCache.clientMap { for _, clusterName := range clusterNames { if key == clusterName { - err := s.ensureDnsRecords(clusterName, service) + err := s.ensureDnsRecords(clusterName, service.lastState) if err != nil { unensuredCount += 1 glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err) @@ -1065,6 +1097,24 @@ func (s *ServiceController) delete(service *v1.Service) error { return err } + // Ensure DNS records are removed for service + if wantsDNSRecords(service) { + key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + serviceIngress, err := ParseFederatedServiceIngress(service) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err)) + return err + } + for _, ingress := range serviceIngress.Items { + err := s.ensureDnsRecords(ingress.Cluster, service) + if err != nil { + glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, ingress.Cluster, err) + return err + } + glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster) + } + } + err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. @@ -1085,3 +1135,410 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura s.serviceCache.delete(key) return nil, doNotRetry } + +func (s *ServiceController) deliverServicesOnClusterChange() { + if !s.isSynced() { + s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay) + } + glog.V(5).Infof("Delivering all service as cluster status changed") + serviceList, err := s.serviceStore.List(labels.Everything()) + if err != nil { + runtime.HandleError(fmt.Errorf("error listing federated services: %v", err)) + s.clusterDeliverer.DeliverAfter(allClustersKey, nil, 0) + } + for _, service := range serviceList { + s.deliverObject(service, 0, false) + } +} + +func (s *ServiceController) deliverObject(object interface{}, delay time.Duration, failed bool) { + switch value := object.(type) { + case *v1.Service: + s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed) + case *v1.Endpoints: + s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed) + default: + glog.Warningf("Unknown object received: %v", object) + } +} + +// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. +func (s *ServiceController) deliverService(key string, delay time.Duration, failed bool) { + if failed { + s.flowcontrolBackoff.Next(key, time.Now()) + delay = delay + s.flowcontrolBackoff.Get(key) + } else { + s.flowcontrolBackoff.Reset(key) + } + s.objectDeliverer.DeliverAfter(key, key, delay) +} + +// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet synced with +// the corresponding api server. +func (s *ServiceController) isSynced() bool { + if !s.federatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + serviceClusters, err := s.federatedInformer.GetReadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err)) + return false + } + if !s.federatedInformer.GetTargetStore().ClustersSynced(serviceClusters) { + return false + } + + if !s.endpointFederatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + endpointClusters, err := s.endpointFederatedInformer.GetReadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err)) + return false + } + if !s.endpointFederatedInformer.GetTargetStore().ClustersSynced(endpointClusters) { + return false + } + + return true +} + +// reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters. +// This function is called on service Addition/Deletion/Updation either in federated cluster or in federation. +func (s *ServiceController) reconcileService(key string) (reconciliationStatus, error) { + if !s.isSynced() { + glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key) + return statusNotSynced, nil + } + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err)) + return statusError, err + } + + service, err := s.serviceStore.Services(namespace).Get(name) + if errors.IsNotFound(err) { + // Not a federated service, ignoring. + return statusAllOk, nil + } else if err != nil { + return statusError, err + } + + glog.V(3).Infof("Reconciling federated service: %s", key) + + // Create a copy before modifying the service to prevent race condition with other readers of service from store + fedServiceObj, err := api.Scheme.DeepCopy(service) + fedService, ok := fedServiceObj.(*v1.Service) + if err != nil || !ok { + runtime.HandleError(fmt.Errorf("Error in retrieving obj from store: %s, %v", key, err)) + return statusError, err + } + + // Handle deletion of federated service + if fedService.DeletionTimestamp != nil { + if err := s.delete(fedService); err != nil { + runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err)) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err) + return statusError, err + } + glog.V(3).Infof("Deleting federated service succeeded: %s", key) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded") + return statusAllOk, nil + } + + // Add the required finalizers before creating a service in underlying clusters. This ensures that the + // dependent services in underlying clusters are deleted when the federated service is deleted. + updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService) + if err != nil { + glog.Warningf("Failed to ensure setting finalizer for service %s: %v", key, err) + return statusError, err + } + fedService = updatedServiceObj.(*v1.Service) + + // Synchronize the federated service in all underlying ready clusters. + clusters, err := s.federatedInformer.GetReadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err)) + return statusError, err + } + + newLBStatus := newLoadbalancerStatus() + newServiceIngress := NewFederatedServiceIngress() + operations := make([]fedutil.FederatedOperation, 0) + for _, cluster := range clusters { + // Aggregate all operations to perform on all federated clusters + operation, err := s.getOperationsToPerformOnCluster(cluster, fedService) + if err != nil { + return statusError, err + } + if operation != nil { + operations = append(operations, *operation) + } + + // Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service + lbStatus, err := s.getServiceStatusInCluster(cluster, key) + if err != nil { + return statusError, err + } + if len(lbStatus.Ingress) > 0 { + newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...) + + // Add/Update federated service ingress only if there are reachable endpoints backing the lb service + endpoints, err := s.getServiceEndpointsInCluster(cluster, key) + if err != nil { + return statusError, err + } + // if there are no endpoints created for the service then the loadbalancer ingress + // is not reachable, so do not consider such loadbalancer ingresses for federated + // service ingresses + if len(endpoints) > 0 { + clusterIngress := fedapi.ClusterServiceIngress{ + Cluster: cluster.Name, + Items: lbStatus.Ingress, + } + newServiceIngress.Items = append(newServiceIngress.Items, clusterIngress) + } + } + } + + if len(operations) != 0 { + err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout, + func(op fedutil.FederatedOperation, operror error) { + runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror)) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror) + }) + if err != nil { + if !errors.IsAlreadyExists(err) { + runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) + return statusError, err + } + } + } + + // Update the federated service if there are any updates in clustered service (status/endpoints) + err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress) + if err != nil { + return statusError, err + } + + glog.V(5).Infof("Everything is in order in federated clusters for service %s", key) + return statusAllOk, nil +} + +// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service +func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Cluster, fedService *v1.Service) (*fedutil.FederatedOperation, error) { + var operation *fedutil.FederatedOperation + + key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String() + clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err)) + return nil, err + } + if !serviceFound { + desiredService := &v1.Service{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta), + Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), + } + desiredService.ResourceVersion = "" + + glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name) + + operation = &fedutil.FederatedOperation{ + Type: fedutil.OperationTypeAdd, + Obj: desiredService, + ClusterName: cluster.Name, + } + } else { + clusterService, ok := clusterServiceObj.(*v1.Service) + if !ok { + runtime.HandleError(fmt.Errorf("Unexpected error for %q: %v", key, err)) + return nil, err + } + + desiredService := &v1.Service{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(clusterService.ObjectMeta), + Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), + } + + // ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating + desiredService.Spec.ClusterIP = clusterService.Spec.ClusterIP + for _, cPort := range clusterService.Spec.Ports { + for i, fPort := range clusterService.Spec.Ports { + if fPort.Name == cPort.Name && fPort.Protocol == cPort.Protocol && fPort.Port == cPort.Port { + desiredService.Spec.Ports[i].NodePort = cPort.NodePort + } + } + } + + // Update existing service, if needed. + if !Equivalent(desiredService, clusterService) { + glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService) + + // ResourceVersion of cluster service can be different from federated service, + // so do not update ResourceVersion while updating cluster service + desiredService.ResourceVersion = clusterService.ResourceVersion + + operation = &fedutil.FederatedOperation{ + Type: fedutil.OperationTypeUpdate, + Obj: desiredService, + ClusterName: cluster.Name, + } + } else { + glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService) + } + } + return operation, nil +} + +// getServiceStatusInCluster returns service status in federated cluster +func (s *ServiceController) getServiceStatusInCluster(cluster *v1beta1.Cluster, key string) (*v1.LoadBalancerStatus, error) { + lbStatus := &v1.LoadBalancerStatus{} + + clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err)) + return lbStatus, err + } + if serviceFound { + clusterService, ok := clusterServiceObj.(*v1.Service) + if !ok { + err = fmt.Errorf("Unknown object received: %v", clusterServiceObj) + runtime.HandleError(err) + return lbStatus, err + } + lbStatus = &clusterService.Status.LoadBalancer + newLbStatus := &loadbalancerStatus{*lbStatus} + sort.Sort(newLbStatus) + } + return lbStatus, nil +} + +// getServiceEndpointsInCluster returns ready endpoints corresonding to service in federated cluster +func (s *ServiceController) getServiceEndpointsInCluster(cluster *v1beta1.Cluster, key string) ([]v1.EndpointAddress, error) { + addresses := []v1.EndpointAddress{} + + clusterEndpointsObj, endpointsFound, err := s.endpointFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get %s endpoint from %s: %v", key, cluster.Name, err)) + return addresses, err + } + if endpointsFound { + clusterEndpoints, ok := clusterEndpointsObj.(*v1.Endpoints) + if !ok { + glog.Warningf("Unknown object received: %v", clusterEndpointsObj) + return addresses, fmt.Errorf("Unknown object received: %v", clusterEndpointsObj) + } + for _, subset := range clusterEndpoints.Subsets { + if len(subset.Addresses) > 0 { + addresses = append(addresses, subset.Addresses...) + } + } + } + return addresses, nil +} + +// updateFederatedService updates the federated service with aggregated lbStatus and serviceIngresses +// and also updates the dns records as needed +func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLBStatus *loadbalancerStatus, newServiceIngress *FederatedServiceIngress) error { + key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String() + needUpdate := false + + // Sort the endpoints so that we can compare + sort.Sort(newLBStatus) + if !reflect.DeepEqual(fedService.Status.LoadBalancer.Ingress, newLBStatus.Ingress) { + fedService.Status.LoadBalancer.Ingress = newLBStatus.Ingress + glog.V(3).Infof("Federated service loadbalancer status updated for %s: %v", key, newLBStatus.Ingress) + needUpdate = true + } + + existingServiceIngress, err := ParseFederatedServiceIngress(fedService) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err)) + return err + } + + // TODO: We should have a reliable cluster health check(should consider quorum) to detect cluster is not + // reachable and remove dns records for them. Until a reliable cluster health check is available, below code is + // a workaround to not remove the existing dns records which were created before the cluster went offline. + unreadyClusters, err := s.federatedInformer.GetUnreadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get unready cluster list: %v", err)) + return err + } + for _, cluster := range unreadyClusters { + lbIngress := existingServiceIngress.GetClusterLoadBalancerIngresses(cluster.Name) + newServiceIngress.AddClusterLoadBalancerIngresses(cluster.Name, lbIngress) + glog.V(5).Infof("Cluster %s is Offline, Preserving previously available status for Service %s", cluster.Name, key) + } + + // Update federated service status and/or ingress annotations if changed + sort.Sort(newServiceIngress) + if !reflect.DeepEqual(existingServiceIngress.Items, newServiceIngress.Items) { + fedService = UpdateIngressAnnotation(fedService, newServiceIngress) + glog.V(3).Infof("Federated service loadbalancer ingress updated for %s: existing: %#v, desired: %#v", key, existingServiceIngress, newServiceIngress) + needUpdate = true + } + + if needUpdate { + var err error + fedService, err = s.federationClient.Core().Services(fedService.Namespace).UpdateStatus(fedService) + if err != nil { + runtime.HandleError(fmt.Errorf("Error updating the federation service object %s: %v", key, err)) + return err + } + } + + // Ensure DNS records based on Annotations in federated service for all federated clusters + if needUpdate && wantsDNSRecords(fedService) { + for _, ingress := range newServiceIngress.Items { + err := s.ensureDnsRecords(ingress.Cluster, fedService) + if err != nil { + runtime.HandleError(fmt.Errorf("Error ensuring DNS Records for service %s on cluster %q: %v", key, ingress.Cluster, err)) + return err + } + glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster) + } + } + return nil +} + +// Equivalent Checks if cluster-independent, user provided data in two given services are equal. If in the future the +// services structure is expanded then any field that is not populated by the api server should be included here. +func Equivalent(s1, s2 *v1.Service) bool { + // TODO: should also check for all annotations except FederationServiceIngressAnnotation + return s1.Name == s2.Name && s1.Namespace == s2.Namespace && + (reflect.DeepEqual(s1.Labels, s2.Labels) || (len(s1.Labels) == 0 && len(s2.Labels) == 0)) && + reflect.DeepEqual(s1.Spec, s2.Spec) +} + +type loadbalancerStatus struct { + v1.LoadBalancerStatus +} + +func newLoadbalancerStatus() *loadbalancerStatus { + return &loadbalancerStatus{} +} + +func (lbs loadbalancerStatus) Len() int { + return len(lbs.Ingress) +} + +func (lbs loadbalancerStatus) Less(i, j int) bool { + ipComparison := strings.Compare(lbs.Ingress[i].IP, lbs.Ingress[j].IP) + hostnameComparison := strings.Compare(lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname) + if ipComparison < 0 || (ipComparison == 0 && hostnameComparison < 0) { + return true + } + return false +} + +func (lbs loadbalancerStatus) Swap(i, j int) { + lbs.Ingress[i].IP, lbs.Ingress[j].IP = lbs.Ingress[j].IP, lbs.Ingress[i].IP + lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname = lbs.Ingress[j].Hostname, lbs.Ingress[i].Hostname +} diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index 0dd86f156c1..b7c545be074 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -367,6 +367,8 @@ func NewCluster(name string, readyStatus apiv1.ConditionStatus) *federationapi.C Conditions: []federationapi.ClusterCondition{ {Type: federationapi.ClusterReady, Status: readyStatus}, }, + Zones: []string{"foozone"}, + Region: "fooregion", }, } }