From 58b2cce95e1caa868a755f4ece82bdf5156771a9 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Mon, 13 Feb 2017 11:46:52 +0530 Subject: [PATCH 1/8] Add types for federated service ingress annotation --- federation/apis/federation/types.go | 16 +++ .../federation-controller/service/ingress.go | 136 ++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 federation/pkg/federation-controller/service/ingress.go diff --git a/federation/apis/federation/types.go b/federation/apis/federation/types.go index fb83be487d3..baef99d07d1 100644 --- a/federation/apis/federation/types.go +++ b/federation/apis/federation/types.go @@ -19,6 +19,7 @@ package federation import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" ) // ServerAddressByClientCIDR helps the client to determine the server address that they should use, depending on the clientCIDR that they match. @@ -153,3 +154,18 @@ type ClusterReplicaSetPreferences struct { // A number expressing the preference to put an additional replica to this LocalReplicaSet. 0 by default. Weight int64 } + +// Annotation for a federated service to keep record of service loadbalancer ingresses in federated cluster +type FederatedServiceIngress struct { + // List of loadbalancer ingress of a service in all federated clusters + // +optional + Items []ClusterServiceIngress `json:"items,omitempty"` +} + +// Loadbalancer ingresses of a service within a federated cluster +type ClusterServiceIngress struct { + // Cluster is the name of the federated cluster + Cluster string `json:"cluster"` + // List of loadbalancer ingresses of a federated service within a federated cluster + Items []v1.LoadBalancerIngress `json:"items"` +} diff --git a/federation/pkg/federation-controller/service/ingress.go b/federation/pkg/federation-controller/service/ingress.go new file mode 100644 index 00000000000..0fdf9103407 --- /dev/null +++ b/federation/pkg/federation-controller/service/ingress.go @@ -0,0 +1,136 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "encoding/json" + "sort" + "strings" + + fedapi "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/pkg/api/v1" +) + +// Compile time check for interface adherence +var _ sort.Interface = &FederatedServiceIngress{} + +const ( + FederatedServiceIngressAnnotation = "federation.kubernetes.io/service-ingresses" +) + +// FederatedServiceIngress implements sort.Interface. +type FederatedServiceIngress struct { + fedapi.FederatedServiceIngress +} + +func NewFederatedServiceIngress() *FederatedServiceIngress { + return &FederatedServiceIngress{} +} + +func (ingress *FederatedServiceIngress) String() string { + annotationBytes, _ := json.Marshal(ingress) + return string(annotationBytes[:]) +} + +// Len is to satisfy of sort.Interface. +func (ingress *FederatedServiceIngress) Len() int { + return len(ingress.Items) +} + +// Less is to satisfy of sort.Interface. +func (ingress *FederatedServiceIngress) Less(i, j int) bool { + return (strings.Compare(ingress.Items[i].Cluster, ingress.Items[j].Cluster) < 0) +} + +// Swap is to satisfy of sort.Interface. +func (ingress *FederatedServiceIngress) Swap(i, j int) { + ingress.Items[i].Cluster, ingress.Items[j].Cluster = ingress.Items[j].Cluster, ingress.Items[i].Cluster + ingress.Items[i].Items, ingress.Items[j].Items = ingress.Items[j].Items, ingress.Items[i].Items +} + +// GetClusterLoadBalancerIngresses returns loadbalancer ingresses for given cluster if exist otherwise returns an empty slice +func (ingress *FederatedServiceIngress) GetClusterLoadBalancerIngresses(cluster string) []v1.LoadBalancerIngress { + for _, clusterIngress := range ingress.Items { + if cluster == clusterIngress.Cluster { + return clusterIngress.Items + } + } + return []v1.LoadBalancerIngress{} +} + +// AddClusterLoadBalancerIngresses adds the ladbalancer ingresses for a given cluster to federated service ingress +func (ingress *FederatedServiceIngress) AddClusterLoadBalancerIngresses(cluster string, loadbalancerIngresses []v1.LoadBalancerIngress) { + for i, clusterIngress := range ingress.Items { + if cluster == clusterIngress.Cluster { + ingress.Items[i].Items = append(ingress.Items[i].Items, loadbalancerIngresses...) + return + } + } + clusterNewIngress := fedapi.ClusterServiceIngress{Cluster: cluster, Items: loadbalancerIngresses} + ingress.Items = append(ingress.Items, clusterNewIngress) + sort.Sort(ingress) +} + +// AddEndpoints add one or more endpoints to federated service ingress. +// endpoints are federated cluster's loadbalancer ip/hostname for the service +func (ingress *FederatedServiceIngress) AddEndpoints(cluster string, endpoints []string) *FederatedServiceIngress { + lbIngress := []v1.LoadBalancerIngress{} + for _, endpoint := range endpoints { + lbIngress = append(lbIngress, v1.LoadBalancerIngress{IP: endpoint}) + } + ingress.AddClusterLoadBalancerIngresses(cluster, lbIngress) + return ingress +} + +// RemoveEndpoint removes a single endpoint (ip/hostname) from the federated service ingress +func (ingress *FederatedServiceIngress) RemoveEndpoint(cluster string, endpoint string) *FederatedServiceIngress { + for i, clusterIngress := range ingress.Items { + if cluster == clusterIngress.Cluster { + for j, lbIngress := range clusterIngress.Items { + if lbIngress.IP == endpoint { + ingress.Items[i].Items = append(ingress.Items[i].Items[:j], ingress.Items[i].Items[j+1:]...) + } + } + } + } + return ingress +} + +// ParseFederatedServiceIngress extracts federated service ingresses from a federated service +func ParseFederatedServiceIngress(service *v1.Service) (*FederatedServiceIngress, error) { + ingress := FederatedServiceIngress{} + if service.Annotations == nil { + return &ingress, nil + } + federatedServiceIngressString, found := service.Annotations[FederatedServiceIngressAnnotation] + if !found { + return &ingress, nil + } + if err := json.Unmarshal([]byte(federatedServiceIngressString), &ingress); err != nil { + return &ingress, err + } + return &ingress, nil +} + +// UpdateIngressAnnotation updates the federated service with service ingress annotation +func UpdateIngressAnnotation(service *v1.Service, ingress *FederatedServiceIngress) *v1.Service { + if service.Annotations == nil { + service.Annotations = make(map[string]string) + } + service.Annotations[FederatedServiceIngressAnnotation] = ingress.String() + return service +} From b28f41eb025a3f24d36a4ea641a962137e3ef36b Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 8 Feb 2017 16:45:39 +0530 Subject: [PATCH 2/8] Add new ResourceEventHandlerFuncs to federated informer --- .../federation-controller/util/handlers.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/federation/pkg/federation-controller/util/handlers.go b/federation/pkg/federation-controller/util/handlers.go index b568faf2b92..0e2dec5bf58 100644 --- a/federation/pkg/federation-controller/util/handlers.go +++ b/federation/pkg/federation-controller/util/handlers.go @@ -77,3 +77,35 @@ func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkgruntime.Object)) *cache. }, } } + +// Returns cache.ResourceEventHandlerFuncs that trigger the given function +// on object add/delete or ObjectMeta or given field is updated. +func NewTriggerOnMetaAndFieldChanges(field string, triggerFunc func(pkgruntime.Object)) *cache.ResourceEventHandlerFuncs { + getFieldOrPanic := func(obj interface{}, fieldName string) interface{} { + val := reflect.ValueOf(obj).Elem().FieldByName(fieldName) + if val.IsValid() { + return val.Interface() + } else { + panic(fmt.Errorf("field not found: %s", fieldName)) + } + } + return &cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(old interface{}) { + oldObj := old.(pkgruntime.Object) + triggerFunc(oldObj) + }, + AddFunc: func(cur interface{}) { + curObj := cur.(pkgruntime.Object) + triggerFunc(curObj) + }, + UpdateFunc: func(old, cur interface{}) { + curObj := cur.(pkgruntime.Object) + oldMeta := getFieldOrPanic(old, "ObjectMeta").(metav1.ObjectMeta) + curMeta := getFieldOrPanic(cur, "ObjectMeta").(metav1.ObjectMeta) + if !ObjectMetaEquivalent(oldMeta, curMeta) || + !reflect.DeepEqual(getFieldOrPanic(old, field), getFieldOrPanic(cur, field)) { + triggerFunc(curObj) + } + }, + } +} From bacd7b745421171c2efd37e220caeeb96ec7f057 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 15 Feb 2017 22:19:06 +0530 Subject: [PATCH 3/8] Add few new fake hooks to test --- .../util/test/test_helper.go | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index 4a9020725e5..0dd86f156c1 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -177,6 +177,38 @@ func RegisterFakeList(resource string, client *core.Fake, obj runtime.Object) { }) } +// RegisterFakeClusterGet registers a get response for the cluster resource inside the given fake client. +func RegisterFakeClusterGet(client *core.Fake, obj runtime.Object) { + clusterList, ok := obj.(*federationapi.ClusterList) + client.AddReactor("get", "clusters", func(action core.Action) (bool, runtime.Object, error) { + name := action.(core.GetAction).GetName() + if ok { + for _, cluster := range clusterList.Items { + if cluster.Name == name { + return true, &cluster, nil + } + } + } + return false, nil, fmt.Errorf("could not find the requested cluster: %s", name) + }) +} + +// RegisterFakeOnCreate registers a reactor in the given fake client that passes +// all created objects to the given watcher. +func RegisterFakeOnCreate(resource string, client *core.Fake, watcher *WatcherDispatcher) { + client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + originalObj := createAction.GetObject() + // Create a copy of the object here to prevent data races while reading the object in go routine. + obj := copy(originalObj) + watcher.orderExecution <- func() { + glog.V(4).Infof("Object created: %v", obj) + watcher.Add(obj) + } + return true, originalObj, nil + }) +} + // RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes // all created objects to the given watcher and also copies them to a channel for // in-test inspection. @@ -197,6 +229,32 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch return objChan } +// RegisterFakeOnUpdate registers a reactor in the given fake client that passes +// all updated objects to the given watcher. +func RegisterFakeOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) { + client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { + updateAction := action.(core.UpdateAction) + originalObj := updateAction.GetObject() + glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject()) + + // Create a copy of the object here to prevent data races while reading the object in go routine. + obj := copy(originalObj) + operation := func() { + glog.V(4).Infof("Object updated %v", obj) + watcher.Modify(obj) + } + select { + case watcher.orderExecution <- operation: + break + case <-time.After(pushTimeout): + glog.Errorf("Fake client execution channel blocked") + glog.Errorf("Tried to push %v", updateAction) + } + return true, originalObj, nil + }) + return +} + // RegisterFakeCopyOnUpdate registers a reactor in the given fake client that passes // all updated objects to the given watcher and also copies them to a channel for // in-test inspection. From d00eca48da57df5ecec8dfce40b47b4229ce7563 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Mon, 13 Feb 2017 19:30:01 +0530 Subject: [PATCH 4/8] Use federated informer framework and use annotations to store lb ingress --- .../pkg/federation-controller/service/dns.go | 59 +- .../federation-controller/service/dns_test.go | 15 +- .../service/endpoint_helper.go | 6 +- .../service/endpoint_helper_test.go | 6 + .../service/service_helper.go | 2 +- .../service/servicecontroller.go | 591 ++++++++++++++++-- .../util/test/test_helper.go | 2 + 7 files changed, 581 insertions(+), 100 deletions(-) diff --git a/federation/pkg/federation-controller/service/dns.go b/federation/pkg/federation-controller/service/dns.go index 1bc53f25000..789a45f162c 100644 --- a/federation/pkg/federation-controller/service/dns.go +++ b/federation/pkg/federation-controller/service/dns.go @@ -24,8 +24,10 @@ import ( "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype" + "k8s.io/kubernetes/pkg/api/v1" ) const ( @@ -34,7 +36,7 @@ const ( ) // getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error) -func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedService *cachedService) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) { +func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) { var ( zoneNames []string regionName string @@ -42,16 +44,24 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic if zoneNames, regionName, err = s.getClusterZoneNames(clusterName); err != nil { return nil, nil, nil, err } - for lbClusterName, lbStatus := range cachedService.serviceStatusMap { + + // If federated service is deleted, return empty endpoints, so that DNS records are removed + if service.DeletionTimestamp != nil { + return zoneEndpoints, regionEndpoints, globalEndpoints, nil + } + + serviceIngress, err := ParseFederatedServiceIngress(service) + if err != nil { + return nil, nil, nil, err + } + + for _, lbClusterIngress := range serviceIngress.Items { + lbClusterName := lbClusterIngress.Cluster lbZoneNames, lbRegionName, err := s.getClusterZoneNames(lbClusterName) if err != nil { return nil, nil, nil, err } - for _, ingress := range lbStatus.Ingress { - readyEndpoints, ok := cachedService.endpointMap[lbClusterName] - if !ok || readyEndpoints == 0 { - continue - } + for _, ingress := range lbClusterIngress.Items { var address string // We should get either an IP address or a hostname - use whichever one we get if ingress.IP != "" { @@ -61,7 +71,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic } if len(address) <= 0 { return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.", - cachedService.lastState.Name, cachedService.lastState.Namespace, clusterName) + service.Name, service.Namespace, clusterName) } for _, lbZoneName := range lbZoneNames { for _, zoneName := range zoneNames { @@ -80,15 +90,12 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic } // getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS) -func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []string, region string, err error) { - client, ok := s.clusterCache.clientMap[clusterName] - if !ok { - return nil, "", fmt.Errorf("Cluster cache does not contain entry for cluster %s", clusterName) +func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) { + cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{}) + if err != nil { + return nil, "", err } - if client.cluster == nil { - return nil, "", fmt.Errorf("Cluster cache entry for cluster %s is nil", clusterName) - } - return client.cluster.Status.Zones, client.cluster.Status.Region, nil + return cluster.Status.Zones, cluster.Status.Region, nil } // getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records @@ -284,7 +291,7 @@ given the current state of that service in that cluster. This should be called (or vice versa). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */ -func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *cachedService) error { +func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error { // Quinton: Pseudocode.... // See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648 // For each service we need the following DNS names: @@ -298,21 +305,21 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * // - a set of A records to IP addresses of all healthy shards in all regions, if one or more of these exist. // - no record (NXRECORD response) if no healthy shards exist in any regions // - // For each cached service, cachedService.lastState tracks the current known state of the service, while cachedService.appliedState contains - // the state of the service when we last successfully synced its DNS records. - // So this time around we only need to patch that (add new records, remove deleted records, and update changed records). + // Each service has the current known state of loadbalancer ingress for the federated cluster stored in annotations. + // So generate the DNS records based on the current state and ensure those desired DNS records match the + // actual DNS records (add new records, remove deleted records, and update changed records). // if s == nil { - return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService) + return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service) } if s.dns == nil { return nil } - if cachedService == nil { - return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService) + if service == nil { + return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service) } - serviceName := cachedService.lastState.Name - namespaceName := cachedService.lastState.Namespace + serviceName := service.Name + namespaceName := service.Namespace zoneNames, regionName, err := s.getClusterZoneNames(clusterName) if err != nil { return err @@ -324,7 +331,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * if err != nil { return err } - zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, cachedService) + zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service) if err != nil { return err } diff --git a/federation/pkg/federation-controller/service/dns_test.go b/federation/pkg/federation-controller/service/dns_test.go index 6836449e7e1..2153cfff040 100644 --- a/federation/pkg/federation-controller/service/dns_test.go +++ b/federation/pkg/federation-controller/service/dns_test.go @@ -27,11 +27,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" ) func TestServiceController_ensureDnsRecords(t *testing.T) { + clusterName := "testcluster" + tests := []struct { name string service v1.Service @@ -44,6 +48,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "servicename", Namespace: "servicenamespace", + Annotations: map[string]string{ + FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + AddEndpoints(clusterName, []string{"198.51.100.1"}). + String()}, }, }, serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}), @@ -92,7 +100,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { if !ok { t.Error("Unable to fetch zones") } + fakeClient := &fakefedclientset.Clientset{} + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) serviceController := ServiceController{ + federationClient: fakeClient, dns: fakedns, dnsZones: fakednsZones, serviceDnsSuffix: "federation.example.com", @@ -106,8 +117,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { knownClusterSet: make(sets.String), } - clusterName := "testcluster" - serviceController.clusterCache.clientMap[clusterName] = &clusterCache{ cluster: &v1beta1.Cluster{ Status: v1beta1.ClusterStatus{ @@ -127,7 +136,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { cachedService.serviceStatusMap[clusterName] = test.serviceStatus } - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, &test.service) if err != nil { t.Errorf("Test failed for %s, unexpected error %v", test.name, err) } diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 53295c4e523..b95d50082e2 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -122,7 +122,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) delete(cachedService.endpointMap, clusterName) for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) if err == nil { return nil } @@ -154,7 +154,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName) cachedService.endpointMap[clusterName] = 1 for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) if err == nil { return nil } @@ -175,7 +175,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName) delete(cachedService.endpointMap, clusterName) for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService) + err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) if err == nil { return nil } diff --git a/federation/pkg/federation-controller/service/endpoint_helper_test.go b/federation/pkg/federation-controller/service/endpoint_helper_test.go index eb833116f92..1de1b81fbfa 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper_test.go +++ b/federation/pkg/federation-controller/service/endpoint_helper_test.go @@ -21,14 +21,18 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" v1 "k8s.io/kubernetes/pkg/api/v1" ) var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required. var fakeDnsZones, _ = fakeDns.Zones() +var fakeClient = &fakefedclientset.Clientset{} var fakeServiceController = ServiceController{ + federationClient: fakeClient, dns: fakeDns, dnsZones: fakeDnsZones, federationName: "fed1", @@ -68,6 +72,7 @@ func TestProcessEndpointUpdate(t *testing.T) { }, }, } + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) tests := []struct { name string cachedService *cachedService @@ -121,6 +126,7 @@ func TestProcessEndpointDeletion(t *testing.T) { }, }, } + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) tests := []struct { name string cachedService *cachedService diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 93672290b6c..7efcfff8774 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -108,7 +108,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache if needUpdate { for i := 0; i < clientRetryCount; i++ { - err := sc.ensureDnsRecords(clusterName, cachedService) + err := sc.ensureDnsRecords(clusterName, service) if err == nil { break } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index ac7c46c95c6..8617b5f3192 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -18,6 +18,8 @@ package service import ( "fmt" + "sort" + "strings" "sync" "time" @@ -27,8 +29,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/labels" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -36,7 +40,9 @@ import ( clientv1 "k8s.io/client-go/pkg/api/v1" cache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" + fedapi "k8s.io/kubernetes/federation/apis/federation" v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationcache "k8s.io/kubernetes/federation/client/cache" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" @@ -76,6 +82,7 @@ const ( maxNoOfClusters = 100 + reviewDelay = 10 * time.Second updateTimeout = 30 * time.Second allClustersKey = "ALL_CLUSTERS" clusterAvailableDelay = time.Second * 20 @@ -156,6 +163,15 @@ type ServiceController struct { clusterDeliverer *util.DelayingDeliverer deletionHelper *deletionhelper.DeletionHelper + + reviewDelay time.Duration + clusterAvailableDelay time.Duration + updateTimeout time.Duration + + endpointFederatedInformer fedutil.FederatedInformer + federatedUpdater fedutil.FederatedUpdater + objectDeliverer *util.DelayingDeliverer + flowcontrolBackoff *flowcontrol.Backoff } // New returns a new service controller to keep DNS provider service resources @@ -180,11 +196,16 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, rwlock: sync.Mutex{}, clientMap: make(map[string]*clusterCache), }, - eventBroadcaster: broadcaster, - eventRecorder: recorder, - queue: workqueue.New(), - knownClusterSet: make(sets.String), + eventBroadcaster: broadcaster, + eventRecorder: recorder, + queue: workqueue.New(), + knownClusterSet: make(sets.String), + reviewDelay: reviewDelay, + clusterAvailableDelay: clusterAvailableDelay, + updateTimeout: updateTimeout, + flowcontrolBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), } + s.objectDeliverer = util.NewDelayingDeliverer() s.clusterDeliverer = util.NewDelayingDeliverer() var serviceIndexer cache.Indexer serviceIndexer, s.serviceController = cache.NewIndexerInformer( @@ -198,57 +219,13 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, }, &v1.Service{}, serviceSyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: s.enqueueService, - UpdateFunc: func(old, cur interface{}) { - // there is case that old and new are equals but we still catch the event now. - if !reflect.DeepEqual(old, cur) { - s.enqueueService(cur) - } - }, - DeleteFunc: s.enqueueService, - }, + util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { + glog.V(5).Infof("Delivering notification from federation: %v", obj) + s.deliverObject(obj, 0, false) + }), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) s.serviceStore = corelisters.NewServiceLister(serviceIndexer) - s.clusterStore.Store, s.clusterController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return s.federationClient.Federation().Clusters().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return s.federationClient.Federation().Clusters().Watch(options) - }, - }, - &v1beta1.Cluster{}, - clusterSyncPeriod, - cache.ResourceEventHandlerFuncs{ - DeleteFunc: s.clusterCache.delFromClusterSet, - AddFunc: s.clusterCache.addToClientMap, - UpdateFunc: func(old, cur interface{}) { - oldCluster, ok := old.(*v1beta1.Cluster) - if !ok { - return - } - curCluster, ok := cur.(*v1beta1.Cluster) - if !ok { - return - } - if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { - // update when spec is changed - s.clusterCache.addToClientMap(cur) - } - - pred := getClusterConditionPredicate() - // only update when condition changed to ready from not-ready - if !pred(*oldCluster) && pred(*curCluster) { - s.clusterCache.addToClientMap(cur) - } - // did not handle ready -> not-ready - // how could we stop a controller? - }, - }, - ) clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *v1beta1.Cluster) { @@ -271,14 +248,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, // would be just confirmation that some service operation succeeded. util.NewTriggerOnAllChanges( func(obj pkgruntime.Object) { - // TODO: Use this to enque services. + glog.V(5).Infof("Delivering service notification from federated cluster %s: %v", cluster.Name, obj) + s.deliverObject(obj, s.reviewDelay, false) }, )) } s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) - federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer, + s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, func(client kubeclientset.Interface, obj pkgruntime.Object) error { svc := obj.(*v1.Service) _, err := client.Core().Services(svc.Namespace).Create(svc) @@ -293,9 +271,41 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, svc := obj.(*v1.Service) orphanDependents := false err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) + // IsNotFound error is fine since that means the object is deleted already. + if errors.IsNotFound(err) { + return nil + } return err }) + // Federated informers on endpoints in federated clusters. + // This will enable to check if service ingress endpoints in federated clusters are reachable + s.endpointFederatedInformer = fedutil.NewFederatedInformer( + federationClient, + func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) ( + cache.Store, cache.Controller) { + return cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { + return targetClient.Core().Endpoints(metav1.NamespaceAll).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return targetClient.Core().Endpoints(metav1.NamespaceAll).Watch(options) + }, + }, + &v1.Endpoints{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndFieldChanges( + "Subsets", + func(obj pkgruntime.Object) { + glog.V(5).Infof("Delivering endpoint notification from federated cluster %s :%v", cluster.Name, obj) + s.deliverObject(obj, s.reviewDelay, false) + }, + )) + }, + &fedutil.ClusterLifecycleHandlerFuncs{}, + ) + s.deletionHelper = deletionhelper.NewDeletionHelper( s.hasFinalizerFunc, s.removeFinalizerFunc, @@ -308,7 +318,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, updateTimeout, s.eventRecorder, s.federatedInformer, - federatedUpdater, + s.federatedUpdater, ) s.endpointWorkerMap = make(map[string]bool) @@ -393,22 +403,27 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { } defer runtime.HandleCrash() s.federatedInformer.Start() - s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - // TODO: Use this new clusterDeliverer to reconcile services in new clusters. + s.endpointFederatedInformer.Start() + s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + s.queue.Add(item.Value.(string)) }) + s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + s.deliverServicesOnClusterChange() + }) + fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh) go s.serviceController.Run(stopCh) - go s.clusterController.Run(stopCh) + for i := 0; i < workers; i++ { go wait.Until(s.fedServiceWorker, time.Second, stopCh) } - go wait.Until(s.clusterEndpointWorker, time.Second, stopCh) - go wait.Until(s.clusterServiceWorker, time.Second, stopCh) - go wait.Until(s.clusterSyncLoop, time.Second, stopCh) go func() { <-stopCh glog.Infof("Shutting down Federation Service Controller") s.queue.ShutDown() s.federatedInformer.Stop() + s.endpointFederatedInformer.Stop() + s.objectDeliverer.Stop() + s.clusterDeliverer.Stop() }() return nil } @@ -461,8 +476,15 @@ func (s *ServiceController) init() error { return nil } +type reconciliationStatus string + +const ( + statusAllOk = reconciliationStatus("ALL_OK") + statusError = reconciliationStatus("ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") +) + // fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncService is never invoked concurrently with the same key. func (s *ServiceController) fedServiceWorker() { for { func() { @@ -470,11 +492,21 @@ func (s *ServiceController) fedServiceWorker() { if quit { return } - defer s.queue.Done(key) - err := s.syncService(key.(string)) - if err != nil { - glog.Errorf("Error syncing service: %v", err) + service := key.(string) + status, err := s.reconcileService(service) + switch status { + case statusAllOk: + break + case statusError: + runtime.HandleError(fmt.Errorf("Error reconciling service %q: %v, delivering again", service, err)) + s.deliverService(service, 0, true) + case statusNotSynced: + glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service) + s.deliverService(service, s.clusterAvailableDelay, false) + default: + runtime.HandleError(fmt.Errorf("Unhandled reconciliation status for %q: %s, delivering again", service, status)) + s.deliverService(service, s.reviewDelay, false) } }() } @@ -871,7 +903,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust for key := range s.clusterCache.clientMap { for _, clusterName := range clusterNames { if key == clusterName { - err := s.ensureDnsRecords(clusterName, service) + err := s.ensureDnsRecords(clusterName, service.lastState) if err != nil { unensuredCount += 1 glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err) @@ -1065,6 +1097,24 @@ func (s *ServiceController) delete(service *v1.Service) error { return err } + // Ensure DNS records are removed for service + if wantsDNSRecords(service) { + key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + serviceIngress, err := ParseFederatedServiceIngress(service) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err)) + return err + } + for _, ingress := range serviceIngress.Items { + err := s.ensureDnsRecords(ingress.Cluster, service) + if err != nil { + glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, ingress.Cluster, err) + return err + } + glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster) + } + } + err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. @@ -1085,3 +1135,410 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura s.serviceCache.delete(key) return nil, doNotRetry } + +func (s *ServiceController) deliverServicesOnClusterChange() { + if !s.isSynced() { + s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay) + } + glog.V(5).Infof("Delivering all service as cluster status changed") + serviceList, err := s.serviceStore.List(labels.Everything()) + if err != nil { + runtime.HandleError(fmt.Errorf("error listing federated services: %v", err)) + s.clusterDeliverer.DeliverAfter(allClustersKey, nil, 0) + } + for _, service := range serviceList { + s.deliverObject(service, 0, false) + } +} + +func (s *ServiceController) deliverObject(object interface{}, delay time.Duration, failed bool) { + switch value := object.(type) { + case *v1.Service: + s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed) + case *v1.Endpoints: + s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed) + default: + glog.Warningf("Unknown object received: %v", object) + } +} + +// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. +func (s *ServiceController) deliverService(key string, delay time.Duration, failed bool) { + if failed { + s.flowcontrolBackoff.Next(key, time.Now()) + delay = delay + s.flowcontrolBackoff.Get(key) + } else { + s.flowcontrolBackoff.Reset(key) + } + s.objectDeliverer.DeliverAfter(key, key, delay) +} + +// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet synced with +// the corresponding api server. +func (s *ServiceController) isSynced() bool { + if !s.federatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + serviceClusters, err := s.federatedInformer.GetReadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err)) + return false + } + if !s.federatedInformer.GetTargetStore().ClustersSynced(serviceClusters) { + return false + } + + if !s.endpointFederatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + endpointClusters, err := s.endpointFederatedInformer.GetReadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err)) + return false + } + if !s.endpointFederatedInformer.GetTargetStore().ClustersSynced(endpointClusters) { + return false + } + + return true +} + +// reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters. +// This function is called on service Addition/Deletion/Updation either in federated cluster or in federation. +func (s *ServiceController) reconcileService(key string) (reconciliationStatus, error) { + if !s.isSynced() { + glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key) + return statusNotSynced, nil + } + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err)) + return statusError, err + } + + service, err := s.serviceStore.Services(namespace).Get(name) + if errors.IsNotFound(err) { + // Not a federated service, ignoring. + return statusAllOk, nil + } else if err != nil { + return statusError, err + } + + glog.V(3).Infof("Reconciling federated service: %s", key) + + // Create a copy before modifying the service to prevent race condition with other readers of service from store + fedServiceObj, err := api.Scheme.DeepCopy(service) + fedService, ok := fedServiceObj.(*v1.Service) + if err != nil || !ok { + runtime.HandleError(fmt.Errorf("Error in retrieving obj from store: %s, %v", key, err)) + return statusError, err + } + + // Handle deletion of federated service + if fedService.DeletionTimestamp != nil { + if err := s.delete(fedService); err != nil { + runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err)) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err) + return statusError, err + } + glog.V(3).Infof("Deleting federated service succeeded: %s", key) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded") + return statusAllOk, nil + } + + // Add the required finalizers before creating a service in underlying clusters. This ensures that the + // dependent services in underlying clusters are deleted when the federated service is deleted. + updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService) + if err != nil { + glog.Warningf("Failed to ensure setting finalizer for service %s: %v", key, err) + return statusError, err + } + fedService = updatedServiceObj.(*v1.Service) + + // Synchronize the federated service in all underlying ready clusters. + clusters, err := s.federatedInformer.GetReadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err)) + return statusError, err + } + + newLBStatus := newLoadbalancerStatus() + newServiceIngress := NewFederatedServiceIngress() + operations := make([]fedutil.FederatedOperation, 0) + for _, cluster := range clusters { + // Aggregate all operations to perform on all federated clusters + operation, err := s.getOperationsToPerformOnCluster(cluster, fedService) + if err != nil { + return statusError, err + } + if operation != nil { + operations = append(operations, *operation) + } + + // Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service + lbStatus, err := s.getServiceStatusInCluster(cluster, key) + if err != nil { + return statusError, err + } + if len(lbStatus.Ingress) > 0 { + newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...) + + // Add/Update federated service ingress only if there are reachable endpoints backing the lb service + endpoints, err := s.getServiceEndpointsInCluster(cluster, key) + if err != nil { + return statusError, err + } + // if there are no endpoints created for the service then the loadbalancer ingress + // is not reachable, so do not consider such loadbalancer ingresses for federated + // service ingresses + if len(endpoints) > 0 { + clusterIngress := fedapi.ClusterServiceIngress{ + Cluster: cluster.Name, + Items: lbStatus.Ingress, + } + newServiceIngress.Items = append(newServiceIngress.Items, clusterIngress) + } + } + } + + if len(operations) != 0 { + err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout, + func(op fedutil.FederatedOperation, operror error) { + runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror)) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror) + }) + if err != nil { + if !errors.IsAlreadyExists(err) { + runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) + return statusError, err + } + } + } + + // Update the federated service if there are any updates in clustered service (status/endpoints) + err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress) + if err != nil { + return statusError, err + } + + glog.V(5).Infof("Everything is in order in federated clusters for service %s", key) + return statusAllOk, nil +} + +// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service +func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Cluster, fedService *v1.Service) (*fedutil.FederatedOperation, error) { + var operation *fedutil.FederatedOperation + + key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String() + clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err)) + return nil, err + } + if !serviceFound { + desiredService := &v1.Service{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta), + Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), + } + desiredService.ResourceVersion = "" + + glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name) + + operation = &fedutil.FederatedOperation{ + Type: fedutil.OperationTypeAdd, + Obj: desiredService, + ClusterName: cluster.Name, + } + } else { + clusterService, ok := clusterServiceObj.(*v1.Service) + if !ok { + runtime.HandleError(fmt.Errorf("Unexpected error for %q: %v", key, err)) + return nil, err + } + + desiredService := &v1.Service{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(clusterService.ObjectMeta), + Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), + } + + // ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating + desiredService.Spec.ClusterIP = clusterService.Spec.ClusterIP + for _, cPort := range clusterService.Spec.Ports { + for i, fPort := range clusterService.Spec.Ports { + if fPort.Name == cPort.Name && fPort.Protocol == cPort.Protocol && fPort.Port == cPort.Port { + desiredService.Spec.Ports[i].NodePort = cPort.NodePort + } + } + } + + // Update existing service, if needed. + if !Equivalent(desiredService, clusterService) { + glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService) + s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService) + + // ResourceVersion of cluster service can be different from federated service, + // so do not update ResourceVersion while updating cluster service + desiredService.ResourceVersion = clusterService.ResourceVersion + + operation = &fedutil.FederatedOperation{ + Type: fedutil.OperationTypeUpdate, + Obj: desiredService, + ClusterName: cluster.Name, + } + } else { + glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService) + } + } + return operation, nil +} + +// getServiceStatusInCluster returns service status in federated cluster +func (s *ServiceController) getServiceStatusInCluster(cluster *v1beta1.Cluster, key string) (*v1.LoadBalancerStatus, error) { + lbStatus := &v1.LoadBalancerStatus{} + + clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err)) + return lbStatus, err + } + if serviceFound { + clusterService, ok := clusterServiceObj.(*v1.Service) + if !ok { + err = fmt.Errorf("Unknown object received: %v", clusterServiceObj) + runtime.HandleError(err) + return lbStatus, err + } + lbStatus = &clusterService.Status.LoadBalancer + newLbStatus := &loadbalancerStatus{*lbStatus} + sort.Sort(newLbStatus) + } + return lbStatus, nil +} + +// getServiceEndpointsInCluster returns ready endpoints corresonding to service in federated cluster +func (s *ServiceController) getServiceEndpointsInCluster(cluster *v1beta1.Cluster, key string) ([]v1.EndpointAddress, error) { + addresses := []v1.EndpointAddress{} + + clusterEndpointsObj, endpointsFound, err := s.endpointFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get %s endpoint from %s: %v", key, cluster.Name, err)) + return addresses, err + } + if endpointsFound { + clusterEndpoints, ok := clusterEndpointsObj.(*v1.Endpoints) + if !ok { + glog.Warningf("Unknown object received: %v", clusterEndpointsObj) + return addresses, fmt.Errorf("Unknown object received: %v", clusterEndpointsObj) + } + for _, subset := range clusterEndpoints.Subsets { + if len(subset.Addresses) > 0 { + addresses = append(addresses, subset.Addresses...) + } + } + } + return addresses, nil +} + +// updateFederatedService updates the federated service with aggregated lbStatus and serviceIngresses +// and also updates the dns records as needed +func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLBStatus *loadbalancerStatus, newServiceIngress *FederatedServiceIngress) error { + key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String() + needUpdate := false + + // Sort the endpoints so that we can compare + sort.Sort(newLBStatus) + if !reflect.DeepEqual(fedService.Status.LoadBalancer.Ingress, newLBStatus.Ingress) { + fedService.Status.LoadBalancer.Ingress = newLBStatus.Ingress + glog.V(3).Infof("Federated service loadbalancer status updated for %s: %v", key, newLBStatus.Ingress) + needUpdate = true + } + + existingServiceIngress, err := ParseFederatedServiceIngress(fedService) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err)) + return err + } + + // TODO: We should have a reliable cluster health check(should consider quorum) to detect cluster is not + // reachable and remove dns records for them. Until a reliable cluster health check is available, below code is + // a workaround to not remove the existing dns records which were created before the cluster went offline. + unreadyClusters, err := s.federatedInformer.GetUnreadyClusters() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get unready cluster list: %v", err)) + return err + } + for _, cluster := range unreadyClusters { + lbIngress := existingServiceIngress.GetClusterLoadBalancerIngresses(cluster.Name) + newServiceIngress.AddClusterLoadBalancerIngresses(cluster.Name, lbIngress) + glog.V(5).Infof("Cluster %s is Offline, Preserving previously available status for Service %s", cluster.Name, key) + } + + // Update federated service status and/or ingress annotations if changed + sort.Sort(newServiceIngress) + if !reflect.DeepEqual(existingServiceIngress.Items, newServiceIngress.Items) { + fedService = UpdateIngressAnnotation(fedService, newServiceIngress) + glog.V(3).Infof("Federated service loadbalancer ingress updated for %s: existing: %#v, desired: %#v", key, existingServiceIngress, newServiceIngress) + needUpdate = true + } + + if needUpdate { + var err error + fedService, err = s.federationClient.Core().Services(fedService.Namespace).UpdateStatus(fedService) + if err != nil { + runtime.HandleError(fmt.Errorf("Error updating the federation service object %s: %v", key, err)) + return err + } + } + + // Ensure DNS records based on Annotations in federated service for all federated clusters + if needUpdate && wantsDNSRecords(fedService) { + for _, ingress := range newServiceIngress.Items { + err := s.ensureDnsRecords(ingress.Cluster, fedService) + if err != nil { + runtime.HandleError(fmt.Errorf("Error ensuring DNS Records for service %s on cluster %q: %v", key, ingress.Cluster, err)) + return err + } + glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster) + } + } + return nil +} + +// Equivalent Checks if cluster-independent, user provided data in two given services are equal. If in the future the +// services structure is expanded then any field that is not populated by the api server should be included here. +func Equivalent(s1, s2 *v1.Service) bool { + // TODO: should also check for all annotations except FederationServiceIngressAnnotation + return s1.Name == s2.Name && s1.Namespace == s2.Namespace && + (reflect.DeepEqual(s1.Labels, s2.Labels) || (len(s1.Labels) == 0 && len(s2.Labels) == 0)) && + reflect.DeepEqual(s1.Spec, s2.Spec) +} + +type loadbalancerStatus struct { + v1.LoadBalancerStatus +} + +func newLoadbalancerStatus() *loadbalancerStatus { + return &loadbalancerStatus{} +} + +func (lbs loadbalancerStatus) Len() int { + return len(lbs.Ingress) +} + +func (lbs loadbalancerStatus) Less(i, j int) bool { + ipComparison := strings.Compare(lbs.Ingress[i].IP, lbs.Ingress[j].IP) + hostnameComparison := strings.Compare(lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname) + if ipComparison < 0 || (ipComparison == 0 && hostnameComparison < 0) { + return true + } + return false +} + +func (lbs loadbalancerStatus) Swap(i, j int) { + lbs.Ingress[i].IP, lbs.Ingress[j].IP = lbs.Ingress[j].IP, lbs.Ingress[i].IP + lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname = lbs.Ingress[j].Hostname, lbs.Ingress[i].Hostname +} diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index 0dd86f156c1..b7c545be074 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -367,6 +367,8 @@ func NewCluster(name string, readyStatus apiv1.ConditionStatus) *federationapi.C Conditions: []federationapi.ClusterCondition{ {Type: federationapi.ClusterReady, Status: readyStatus}, }, + Zones: []string{"foozone"}, + Region: "fooregion", }, } } From 721224d72a08d15842b1d0a1162bada333c7b506 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 15 Feb 2017 22:28:37 +0530 Subject: [PATCH 5/8] Add new unit tests for federated service controller --- .../service/servicecontroller_test.go | 284 ++++++++++++++++++ 1 file changed, 284 insertions(+) diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index f67995eb8f2..93a560da309 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -17,13 +17,30 @@ limitations under the License. package service import ( + "fmt" + "reflect" + "strings" "sync" "testing" + "time" + "github.com/golang/glog" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" ) func TestGetClusterConditionPredicate(t *testing.T) { @@ -83,3 +100,270 @@ func TestGetClusterConditionPredicate(t *testing.T) { } } } + +const ( + retryInterval = 100 * time.Millisecond + + clusters string = "clusters" + services string = "services" + endpoints string = "endpoints" + + lbIngress1 = "10.20.30.40" + lbIngress2 = "10.20.30.50" + serviceEndpoint1 = "192.168.0.1" + serviceEndpoint2 = "192.168.1.1" +) + +func TestServiceController(t *testing.T) { + glog.Infof("Creating fake infrastructure") + fedClient := &fakefedclientset.Clientset{} + cluster1 := NewClusterWithRegionZone("cluster1", v1.ConditionTrue, "region1", "zone1") + cluster2 := NewClusterWithRegionZone("cluster2", v1.ConditionTrue, "region2", "zone2") + + RegisterFakeClusterGet(&fedClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}}) + RegisterFakeList(clusters, &fedClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}}) + fedclusterWatch := RegisterFakeWatch(clusters, &fedClient.Fake) + RegisterFakeList(services, &fedClient.Fake, &v1.ServiceList{Items: []v1.Service{}}) + fedServiceWatch := RegisterFakeWatch(services, &fedClient.Fake) + RegisterFakeOnCreate(clusters, &fedClient.Fake, fedclusterWatch) + RegisterFakeOnUpdate(clusters, &fedClient.Fake, fedclusterWatch) + RegisterFakeOnCreate(services, &fedClient.Fake, fedServiceWatch) + RegisterFakeOnUpdate(services, &fedClient.Fake, fedServiceWatch) + + cluster1Client := &fakekubeclientset.Clientset{} + RegisterFakeList(services, &cluster1Client.Fake, &v1.ServiceList{Items: []v1.Service{}}) + c1ServiceWatch := RegisterFakeWatch(services, &cluster1Client.Fake) + RegisterFakeList(endpoints, &cluster1Client.Fake, &v1.EndpointsList{Items: []v1.Endpoints{}}) + c1EndpointWatch := RegisterFakeWatch(endpoints, &cluster1Client.Fake) + RegisterFakeOnCreate(services, &cluster1Client.Fake, c1ServiceWatch) + RegisterFakeOnUpdate(services, &cluster1Client.Fake, c1ServiceWatch) + RegisterFakeOnCreate(endpoints, &cluster1Client.Fake, c1EndpointWatch) + RegisterFakeOnUpdate(endpoints, &cluster1Client.Fake, c1EndpointWatch) + + cluster2Client := &fakekubeclientset.Clientset{} + RegisterFakeList(services, &cluster2Client.Fake, &v1.ServiceList{Items: []v1.Service{}}) + c2ServiceWatch := RegisterFakeWatch(services, &cluster2Client.Fake) + RegisterFakeList(endpoints, &cluster2Client.Fake, &v1.EndpointsList{Items: []v1.Endpoints{}}) + c2EndpointWatch := RegisterFakeWatch(endpoints, &cluster2Client.Fake) + RegisterFakeOnCreate(services, &cluster2Client.Fake, c2ServiceWatch) + RegisterFakeOnUpdate(services, &cluster2Client.Fake, c2ServiceWatch) + RegisterFakeOnCreate(endpoints, &cluster2Client.Fake, c2EndpointWatch) + RegisterFakeOnUpdate(endpoints, &cluster2Client.Fake, c2EndpointWatch) + + fedInformerClientFactory := func(cluster *v1beta1.Cluster) (kubeclientset.Interface, error) { + switch cluster.Name { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + default: + return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) + } + } + + fakedns, _ := clouddns.NewFakeInterface() + sc := New(fedClient, fakedns, "myfederation", "federation.example.com", "example.com", "") + ToFederatedInformerForTestOnly(sc.federatedInformer).SetClientFactory(fedInformerClientFactory) + ToFederatedInformerForTestOnly(sc.endpointFederatedInformer).SetClientFactory(fedInformerClientFactory) + sc.clusterAvailableDelay = 100 * time.Millisecond + sc.reviewDelay = 50 * time.Millisecond + sc.updateTimeout = 5 * time.Second + + stop := make(chan struct{}) + glog.Infof("Running Service Controller") + go sc.Run(5, stop) + + glog.Infof("Adding cluster 1") + fedclusterWatch.Add(cluster1) + + service := NewService("test-service-1", 80) + + // Test add federated service. + glog.Infof("Adding federated service") + fedServiceWatch.Add(service) + key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}.String() + + glog.Infof("Test service was correctly created in cluster 1") + require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, + key, service, wait.ForeverTestTimeout)) + + glog.Infof("Adding cluster 2") + fedclusterWatch.Add(cluster2) + + glog.Infof("Test service was correctly created in cluster 2") + require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster2.Name, + key, service, wait.ForeverTestTimeout)) + + glog.Infof("Test federation service is updated when cluster1 service status is updated") + service.Status = v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: lbIngress1}, + }}} + + desiredStatus := service.Status + desiredService := &v1.Service{Status: desiredStatus} + + c1ServiceWatch.Modify(service) + require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, + key, service, wait.ForeverTestTimeout)) + require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, + key, desiredService, serviceStatusCompare, wait.ForeverTestTimeout)) + + glog.Infof("Test federation service is updated when cluster1 endpoint for the service is created") + desiredIngressAnnotation := NewFederatedServiceIngress(). + AddEndpoints("cluster1", []string{lbIngress1}). + String() + desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} + c1EndpointWatch.Add(NewEndpoint("test-service-1", serviceEndpoint1)) + require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, + key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) + + glog.Infof("Test federation service is updated when cluster2 service status is updated") + service.Status = v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: lbIngress2}, + }}} + desiredStatus.LoadBalancer.Ingress = append(desiredStatus.LoadBalancer.Ingress, v1.LoadBalancerIngress{IP: lbIngress2}) + desiredService = &v1.Service{Status: desiredStatus} + + c2ServiceWatch.Modify(service) + require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster2.Name, + key, service, wait.ForeverTestTimeout)) + require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, + key, desiredService, serviceStatusCompare, wait.ForeverTestTimeout)) + + glog.Infof("Test federation service is updated when cluster2 endpoint for the service is created") + desiredIngressAnnotation = NewFederatedServiceIngress(). + AddEndpoints("cluster1", []string{lbIngress1}). + AddEndpoints("cluster2", []string{lbIngress2}). + String() + desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} + c2EndpointWatch.Add(NewEndpoint("test-service-1", serviceEndpoint2)) + require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, + key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) + + glog.Infof("Test federation service is updated when cluster1 endpoint for the service is deleted") + desiredIngressAnnotation = NewFederatedServiceIngress(). + AddEndpoints("cluster2", []string{lbIngress2}). + String() + desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} + c1EndpointWatch.Delete(NewEndpoint("test-service-1", serviceEndpoint1)) + require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, + key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) + + // Test update federated service. + glog.Infof("Test modifying federated service by changing the port") + service.Spec.Ports[0].Port = 9090 + fedServiceWatch.Modify(service) + require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, + key, service, wait.ForeverTestTimeout)) + + // Test cluster service is recreated when deleted. + glog.Infof("Test cluster service is recreated when deleted") + c1ServiceWatch.Delete(service) + require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, + key, service, wait.ForeverTestTimeout)) + + close(stop) +} + +func NewService(name string, port int32) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: v1.NamespaceDefault, + SelfLink: "/api/v1/namespaces/default/services/" + name, + Labels: map[string]string{"app": name}, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Port: port}}, + Type: v1.ServiceTypeLoadBalancer, + }, + } +} + +func NewEndpoint(name, ip string) *v1.Endpoints { + return &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: v1.NamespaceDefault, + SelfLink: "/api/v1/namespaces/default/endpoints/" + name, + Labels: map[string]string{"app": name}, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: ip, + }}}, + }, + } +} + +// NewClusterWithRegionZone builds a new cluster object with given region and zone attributes. +func NewClusterWithRegionZone(name string, readyStatus v1.ConditionStatus, region, zone string) *v1beta1.Cluster { + cluster := NewCluster(name, readyStatus) + cluster.Status.Zones = []string{zone} + cluster.Status.Region = region + return cluster +} + +// WaitForClusterService waits for the cluster service to be created matching the desiredService. +func WaitForClusterService(t *testing.T, store fedutil.FederatedReadOnlyStore, clusterName, key string, desiredService *v1.Service, timeout time.Duration) error { + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + obj, found, err := store.GetByKey(clusterName, key) + if !found || err != nil { + return false, err + } + service := obj.(*v1.Service) + if !Equivalent(service, desiredService) { + glog.V(5).Infof("Waiting for clustered service, Desired: %v, Current: %v", desiredService, service) + return false, nil + } + glog.V(5).Infof("Clustered service is up to date: %v", service) + return true, nil + }) + return err +} + +type serviceCompare func(current, desired *v1.Service) (match bool) + +func serviceStatusCompare(current, desired *v1.Service) bool { + if !reflect.DeepEqual(current.Status.LoadBalancer, desired.Status.LoadBalancer) { + glog.V(5).Infof("Waiting for loadbalancer status, Current: %v, Desired: %v", current.Status.LoadBalancer, desired.Status.LoadBalancer) + return false + } + glog.V(5).Infof("Loadbalancer status match: %v", current.Status.LoadBalancer) + return true +} + +func serviceIngressCompare(current, desired *v1.Service) bool { + if strings.Compare(current.Annotations[FederatedServiceIngressAnnotation], desired.Annotations[FederatedServiceIngressAnnotation]) != 0 { + glog.V(5).Infof("Waiting for loadbalancer ingress, Current: %v, Desired: %v", current.Annotations[FederatedServiceIngressAnnotation], desired.Annotations[FederatedServiceIngressAnnotation]) + return false + } + glog.V(5).Infof("Loadbalancer ingress match: %v", current.Annotations[FederatedServiceIngressAnnotation]) + return true +} + +// WaitForFederatedServiceUpdate waits for federated service updates to match the desiredService. +func WaitForFederatedServiceUpdate(t *testing.T, store corelisters.ServiceLister, key string, desiredService *v1.Service, match serviceCompare, timeout time.Duration) error { + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return false, err + } + service, err := store.Services(namespace).Get(name) + switch { + case errors.IsNotFound(err): + return false, nil + case err != nil: + return false, err + case !match(service, desiredService): + return false, nil + default: + return true, nil + } + }) + return err +} From 3cb4d69c0b6d142e5137c847af608ebb5b300760 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 15 Feb 2017 23:27:41 +0530 Subject: [PATCH 6/8] Add new unit tests for federated service dns --- .../federation-controller/service/dns_test.go | 144 +++++++++++++++--- 1 file changed, 123 insertions(+), 21 deletions(-) diff --git a/federation/pkg/federation-controller/service/dns_test.go b/federation/pkg/federation-controller/service/dns_test.go index 2153cfff040..d057b8f3a52 100644 --- a/federation/pkg/federation-controller/service/dns_test.go +++ b/federation/pkg/federation-controller/service/dns_test.go @@ -17,12 +17,12 @@ limitations under the License. package service import ( - "sync" - "testing" - "fmt" "reflect" "sort" + "sync" + "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -34,7 +34,15 @@ import ( ) func TestServiceController_ensureDnsRecords(t *testing.T) { - clusterName := "testcluster" + cluster1Name := "c1" + cluster2Name := "c2" + cluster1 := NewClusterWithRegionZone(cluster1Name, v1.ConditionTrue, "fooregion", "foozone") + cluster2 := NewClusterWithRegionZone(cluster2Name, v1.ConditionTrue, "barregion", "barzone") + globalDNSName := "servicename.servicenamespace.myfederation.svc.federation.example.com" + fooRegionDNSName := "servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com" + fooZoneDNSName := "servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com" + barRegionDNSName := "servicename.servicenamespace.myfederation.svc.barregion.federation.example.com" + barZoneDNSName := "servicename.servicenamespace.myfederation.svc.barzone.barregion.federation.example.com" tests := []struct { name string @@ -43,22 +51,24 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { serviceStatus v1.LoadBalancerStatus }{ { - name: "withip", + name: "ServiceWithSingleLBIngress", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "servicename", Namespace: "servicenamespace", Annotations: map[string]string{ FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). - AddEndpoints(clusterName, []string{"198.51.100.1"}). + AddEndpoints(cluster1Name, []string{"198.51.100.1"}). String()}, }, }, serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}), expected: []string{ - "example.com:servicename.servicenamespace.myfederation.svc.federation.example.com:A:180:[198.51.100.1]", - "example.com:servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com:A:180:[198.51.100.1]", - "example.com:servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com:A:180:[198.51.100.1]", + "example.com:" + globalDNSName + ":A:180:[198.51.100.1]", + "example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]", + "example.com:" + fooZoneDNSName + ":A:180:[198.51.100.1]", + "example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]", }, }, /* @@ -74,14 +84,14 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { }, serviceStatus: buildServiceStatus([][]string{{"", "randomstring.amazonelb.example.com"}}), expected: []string{ - "example.com:servicename.servicenamespace.myfederation.svc.federation.example.com:A:180:[198.51.100.1]", - "example.com:servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com:A:180:[198.51.100.1]", - "example.com:servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com:A:180:[198.51.100.1]", + "example.com:"+globalDNSName+":A:180:[198.51.100.1]", + "example.com:"+fooRegionDNSName+":A:180:[198.51.100.1]", + "example.com:"+fooZoneDNSName+":A:180:[198.51.100.1]", }, }, */ { - name: "noendpoints", + name: "ServiceWithNoLBIngress", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "servicename", @@ -89,8 +99,96 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { }, }, expected: []string{ - "example.com:servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com:CNAME:180:[servicename.servicenamespace.myfederation.svc.federation.example.com]", - "example.com:servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com:CNAME:180:[servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com]", + "example.com:" + fooRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + fooZoneDNSName + ":CNAME:180:[" + fooRegionDNSName + "]", + "example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]", + }, + }, + { + name: "ServiceWithMultipleLBIngress", + service: v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "servicename", + Namespace: "servicenamespace", + Annotations: map[string]string{ + FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + AddEndpoints(cluster1Name, []string{"198.51.100.1"}). + AddEndpoints(cluster2Name, []string{"198.51.200.1"}). + String()}, + }, + }, + expected: []string{ + "example.com:" + globalDNSName + ":A:180:[198.51.100.1 198.51.200.1]", + "example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]", + "example.com:" + fooZoneDNSName + ":A:180:[198.51.100.1]", + "example.com:" + barRegionDNSName + ":A:180:[198.51.200.1]", + "example.com:" + barZoneDNSName + ":A:180:[198.51.200.1]", + }, + }, + { + name: "ServiceWithLBIngressAndServiceDeleted", + service: v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "servicename", + Namespace: "servicenamespace", + DeletionTimestamp: &metav1.Time{Time: time.Now()}, + Annotations: map[string]string{ + FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + AddEndpoints(cluster1Name, []string{"198.51.100.1"}). + String()}, + }, + }, + expected: []string{ + // TODO: Ideally we should expect that there are no DNS records when federated service is deleted. Need to remove these leaks in future + "example.com:" + fooRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + fooZoneDNSName + ":CNAME:180:[" + fooRegionDNSName + "]", + "example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]", + }, + }, + { + name: "ServiceWithMultipleLBIngressAndOneLBIngressGettingRemoved", + service: v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "servicename", + Namespace: "servicenamespace", + Annotations: map[string]string{ + FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + AddEndpoints(cluster1Name, []string{"198.51.100.1"}). + AddEndpoints(cluster2Name, []string{"198.51.200.1"}). + RemoveEndpoint(cluster2Name, "198.51.200.1"). + String()}, + }, + }, + expected: []string{ + "example.com:" + globalDNSName + ":A:180:[198.51.100.1]", + "example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]", + "example.com:" + fooZoneDNSName + ":A:180:[198.51.100.1]", + "example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]", + }, + }, + { + name: "ServiceWithMultipleLBIngressAndAllLBIngressGettingRemoved", + service: v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "servicename", + Namespace: "servicenamespace", + Annotations: map[string]string{ + FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + AddEndpoints(cluster1Name, []string{"198.51.100.1"}). + AddEndpoints(cluster2Name, []string{"198.51.200.1"}). + RemoveEndpoint(cluster1Name, "198.51.100.1"). + RemoveEndpoint(cluster2Name, "198.51.200.1"). + String()}, + }, + }, + expected: []string{ + "example.com:" + fooRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + fooZoneDNSName + ":CNAME:180:[" + fooRegionDNSName + "]", + "example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]", + "example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]", }, }, } @@ -101,7 +199,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { t.Error("Unable to fetch zones") } fakeClient := &fakefedclientset.Clientset{} - RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) + RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}}) serviceController := ServiceController{ federationClient: fakeClient, dns: fakedns, @@ -117,7 +215,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { knownClusterSet: make(sets.String), } - serviceController.clusterCache.clientMap[clusterName] = &clusterCache{ + serviceController.clusterCache.clientMap[cluster1Name] = &clusterCache{ cluster: &v1beta1.Cluster{ Status: v1beta1.ClusterStatus{ Zones: []string{"foozone"}, @@ -131,12 +229,16 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { endpointMap: make(map[string]int), serviceStatusMap: make(map[string]v1.LoadBalancerStatus), } - cachedService.endpointMap[clusterName] = 1 + cachedService.endpointMap[cluster1Name] = 1 if !reflect.DeepEqual(&test.serviceStatus, &v1.LoadBalancerStatus{}) { - cachedService.serviceStatusMap[clusterName] = test.serviceStatus + cachedService.serviceStatusMap[cluster1Name] = test.serviceStatus } - err := serviceController.ensureDnsRecords(clusterName, &test.service) + err := serviceController.ensureDnsRecords(cluster1Name, &test.service) + if err != nil { + t.Errorf("Test failed for %s, unexpected error %v", test.name, err) + } + err = serviceController.ensureDnsRecords(cluster2Name, &test.service) if err != nil { t.Errorf("Test failed for %s, unexpected error %v", test.name, err) } @@ -147,7 +249,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { } // Dump every record to a testable-by-string-comparison form - var records []string + records := []string{} for _, z := range zones { zoneName := z.Name() From 36e7ed4cae6d4b01079de89bfd4cf17afd923887 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Mon, 13 Feb 2017 19:38:41 +0530 Subject: [PATCH 7/8] Auto generated code --- federation/apis/federation/BUILD | 1 + .../apis/federation/zz_generated.deepcopy.go | 35 +++++++++++++++++++ .../pkg/federation-controller/service/BUILD | 17 +++++++++ 3 files changed, 53 insertions(+) diff --git a/federation/apis/federation/BUILD b/federation/apis/federation/BUILD index 62d770c6c42..1eb9a802392 100644 --- a/federation/apis/federation/BUILD +++ b/federation/apis/federation/BUILD @@ -19,6 +19,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/api/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/federation/apis/federation/zz_generated.deepcopy.go b/federation/apis/federation/zz_generated.deepcopy.go index 16e0011dec3..6f691c45366 100644 --- a/federation/apis/federation/zz_generated.deepcopy.go +++ b/federation/apis/federation/zz_generated.deepcopy.go @@ -25,6 +25,7 @@ import ( conversion "k8s.io/apimachinery/pkg/conversion" runtime "k8s.io/apimachinery/pkg/runtime" api "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" reflect "reflect" ) @@ -40,9 +41,11 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error { conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterCondition, InType: reflect.TypeOf(&ClusterCondition{})}, conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterList, InType: reflect.TypeOf(&ClusterList{})}, conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterReplicaSetPreferences, InType: reflect.TypeOf(&ClusterReplicaSetPreferences{})}, + conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterServiceIngress, InType: reflect.TypeOf(&ClusterServiceIngress{})}, conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterSpec, InType: reflect.TypeOf(&ClusterSpec{})}, conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterStatus, InType: reflect.TypeOf(&ClusterStatus{})}, conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedReplicaSetPreferences, InType: reflect.TypeOf(&FederatedReplicaSetPreferences{})}, + conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedServiceIngress, InType: reflect.TypeOf(&FederatedServiceIngress{})}, conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ServerAddressByClientCIDR, InType: reflect.TypeOf(&ServerAddressByClientCIDR{})}, ) } @@ -110,6 +113,20 @@ func DeepCopy_federation_ClusterReplicaSetPreferences(in interface{}, out interf } } +func DeepCopy_federation_ClusterServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error { + { + in := in.(*ClusterServiceIngress) + out := out.(*ClusterServiceIngress) + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]api_v1.LoadBalancerIngress, len(*in)) + copy(*out, *in) + } + return nil + } +} + func DeepCopy_federation_ClusterSpec(in interface{}, out interface{}, c *conversion.Cloner) error { { in := in.(*ClusterSpec) @@ -172,6 +189,24 @@ func DeepCopy_federation_FederatedReplicaSetPreferences(in interface{}, out inte } } +func DeepCopy_federation_FederatedServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error { + { + in := in.(*FederatedServiceIngress) + out := out.(*FederatedServiceIngress) + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ClusterServiceIngress, len(*in)) + for i := range *in { + if err := DeepCopy_federation_ClusterServiceIngress(&(*in)[i], &(*out)[i], c); err != nil { + return err + } + } + } + return nil + } +} + func DeepCopy_federation_ServerAddressByClientCIDR(in interface{}, out interface{}, c *conversion.Cloner) error { { in := in.(*ServerAddressByClientCIDR) diff --git a/federation/pkg/federation-controller/service/BUILD b/federation/pkg/federation-controller/service/BUILD index 147d78c1ef9..4dab5063811 100644 --- a/federation/pkg/federation-controller/service/BUILD +++ b/federation/pkg/federation-controller/service/BUILD @@ -15,11 +15,13 @@ go_library( "dns.go", "doc.go", "endpoint_helper.go", + "ingress.go", "service_helper.go", "servicecontroller.go", ], tags = ["automanaged"], deps = [ + "//federation/apis/federation:go_default_library", "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/cache:go_default_library", "//federation/client/clientset_generated/federation_clientset:go_default_library", @@ -36,8 +38,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", @@ -46,6 +50,7 @@ go_library( "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) @@ -62,10 +67,22 @@ go_test( tags = ["automanaged"], deps = [ "//federation/apis/federation/v1beta1:go_default_library", + "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", "//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library", + "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) From 950db8e0a1813791992bf5006215f0da8dde3a34 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Mon, 24 Apr 2017 10:45:14 +0530 Subject: [PATCH 8/8] Handle review comments --- .../service/servicecontroller.go | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 8617b5f3192..ee07c237474 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -479,9 +479,10 @@ func (s *ServiceController) init() error { type reconciliationStatus string const ( - statusAllOk = reconciliationStatus("ALL_OK") - statusError = reconciliationStatus("ERROR") - statusNotSynced = reconciliationStatus("NOSYNC") + statusAllOk = reconciliationStatus("ALL_OK") + statusRecoverableError = reconciliationStatus("RECOVERABLE_ERROR") + statusNonRecoverableError = reconciliationStatus("NON_RECOVERABLE_ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") ) // fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -494,19 +495,19 @@ func (s *ServiceController) fedServiceWorker() { } defer s.queue.Done(key) service := key.(string) - status, err := s.reconcileService(service) + status := s.reconcileService(service) switch status { case statusAllOk: break - case statusError: - runtime.HandleError(fmt.Errorf("Error reconciling service %q: %v, delivering again", service, err)) - s.deliverService(service, 0, true) case statusNotSynced: glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service) s.deliverService(service, s.clusterAvailableDelay, false) + case statusRecoverableError: + s.deliverService(service, 0, true) + case statusNonRecoverableError: + // error is already logged, do nothing default: - runtime.HandleError(fmt.Errorf("Unhandled reconciliation status for %q: %s, delivering again", service, status)) - s.deliverService(service, s.reviewDelay, false) + // unreachable } }() } @@ -1207,34 +1208,39 @@ func (s *ServiceController) isSynced() bool { // reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters. // This function is called on service Addition/Deletion/Updation either in federated cluster or in federation. -func (s *ServiceController) reconcileService(key string) (reconciliationStatus, error) { +func (s *ServiceController) reconcileService(key string) reconciliationStatus { if !s.isSynced() { glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key) - return statusNotSynced, nil + return statusNotSynced } namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err)) - return statusError, err + runtime.HandleError(fmt.Errorf("Invalid key %q recieved, unable to split key to namespace and name, err: %v", key, err)) + return statusNonRecoverableError } service, err := s.serviceStore.Services(namespace).Get(name) if errors.IsNotFound(err) { // Not a federated service, ignoring. - return statusAllOk, nil + return statusAllOk } else if err != nil { - return statusError, err + runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err)) + return statusRecoverableError } glog.V(3).Infof("Reconciling federated service: %s", key) // Create a copy before modifying the service to prevent race condition with other readers of service from store fedServiceObj, err := api.Scheme.DeepCopy(service) + if err != nil { + runtime.HandleError(fmt.Errorf("Error in copying obj: %s, %v", key, err)) + return statusNonRecoverableError + } fedService, ok := fedServiceObj.(*v1.Service) if err != nil || !ok { - runtime.HandleError(fmt.Errorf("Error in retrieving obj from store: %s, %v", key, err)) - return statusError, err + runtime.HandleError(fmt.Errorf("Unknown obj recieved from store: %#v, %v", fedServiceObj, err)) + return statusNonRecoverableError } // Handle deletion of federated service @@ -1242,19 +1248,19 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, if err := s.delete(fedService); err != nil { runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err)) s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err) - return statusError, err + return statusRecoverableError } glog.V(3).Infof("Deleting federated service succeeded: %s", key) s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded") - return statusAllOk, nil + return statusAllOk } // Add the required finalizers before creating a service in underlying clusters. This ensures that the // dependent services in underlying clusters are deleted when the federated service is deleted. updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService) if err != nil { - glog.Warningf("Failed to ensure setting finalizer for service %s: %v", key, err) - return statusError, err + runtime.HandleError(fmt.Errorf("Failed to ensure setting finalizer for service %s: %v", key, err)) + return statusRecoverableError } fedService = updatedServiceObj.(*v1.Service) @@ -1262,7 +1268,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, clusters, err := s.federatedInformer.GetReadyClusters() if err != nil { runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err)) - return statusError, err + return statusRecoverableError } newLBStatus := newLoadbalancerStatus() @@ -1272,7 +1278,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Aggregate all operations to perform on all federated clusters operation, err := s.getOperationsToPerformOnCluster(cluster, fedService) if err != nil { - return statusError, err + return statusRecoverableError } if operation != nil { operations = append(operations, *operation) @@ -1281,7 +1287,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service lbStatus, err := s.getServiceStatusInCluster(cluster, key) if err != nil { - return statusError, err + return statusRecoverableError } if len(lbStatus.Ingress) > 0 { newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...) @@ -1289,7 +1295,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Add/Update federated service ingress only if there are reachable endpoints backing the lb service endpoints, err := s.getServiceEndpointsInCluster(cluster, key) if err != nil { - return statusError, err + return statusRecoverableError } // if there are no endpoints created for the service then the loadbalancer ingress // is not reachable, so do not consider such loadbalancer ingresses for federated @@ -1313,7 +1319,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, if err != nil { if !errors.IsAlreadyExists(err) { runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) - return statusError, err + return statusRecoverableError } } } @@ -1321,11 +1327,11 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Update the federated service if there are any updates in clustered service (status/endpoints) err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress) if err != nil { - return statusError, err + return statusRecoverableError } glog.V(5).Infof("Everything is in order in federated clusters for service %s", key) - return statusAllOk, nil + return statusAllOk } // getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service