diff --git a/federation/pkg/dnsprovider/providers/google/clouddns/rrsets.go b/federation/pkg/dnsprovider/providers/google/clouddns/rrsets.go index 1fae25bb506..2ec6cddf6bc 100644 --- a/federation/pkg/dnsprovider/providers/google/clouddns/rrsets.go +++ b/federation/pkg/dnsprovider/providers/google/clouddns/rrsets.go @@ -36,7 +36,7 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error) } list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets())) for i, rrset := range response.Rrsets() { - list[i] = &ResourceRecordSet{rrset, &rrsets} + list[i] = ResourceRecordSet{rrset, &rrsets} } return list, nil } @@ -58,7 +58,7 @@ func (rrsets ResourceRecordSets) Add(rrset dnsprovider.ResourceRecordSet) (dnspr func (rrsets ResourceRecordSets) Remove(rrset dnsprovider.ResourceRecordSet) error { service := rrsets.zone.zones.interface_.service.Changes() - deletions := []interfaces.ResourceRecordSet{rrset.(*ResourceRecordSet).impl} + deletions := []interfaces.ResourceRecordSet{rrset.(ResourceRecordSet).impl} change := service.NewChange([]interfaces.ResourceRecordSet{}, deletions) newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do() if err != nil { diff --git a/federation/pkg/federation-controller/service/dns.go b/federation/pkg/federation-controller/service/dns.go index f3d4e378454..03051e55353 100644 --- a/federation/pkg/federation-controller/service/dns.go +++ b/federation/pkg/federation-controller/service/dns.go @@ -146,7 +146,7 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) { /* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints. if endpoints is nil or empty, a CNAME record to uplevelCname is ensured. */ -func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error { +func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string, endpointReachable bool) error { dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones) if err != nil { return err @@ -160,30 +160,32 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin return err } if rrset == nil { - // It doesn't exist yet, so create it, if we indeed have healthy endpoints - if len(endpoints) < 1 { - // There are no endpoint addresses at this level, so CNAME to uplevel, if provided - if uplevelCname != "" { - newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME) + if endpointReachable { + // It doesn't exist yet, so create it, if we indeed have healthy endpoints + if len(endpoints) < 1 { + // There are no endpoint addresses at this level, so CNAME to uplevel, if provided + if uplevelCname != "" { + newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME) + rrset, err = rrsets.Add(newRrset) + if err != nil { + return err + } + } + // else we want no record, and we have no record, so we're all good. + } else { + // We have valid endpoint addresses, so just add them as A records. + // But first resolve DNS names, as some cloud providers (like AWS) expose + // load balancers behind DNS names, not IP addresses. + resolvedEndpoints, err := getResolvedEndpoints(endpoints) + if err != nil { + return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve. + } + newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A) rrset, err = rrsets.Add(newRrset) if err != nil { return err } } - // else we want no record, and we have no record, so we're all good. - } else { - // We have valid endpoint addresses, so just add them as A records. - // But first resolve DNS names, as some cloud providers (like AWS) expose - // load balancers behind DNS names, not IP addresses. - resolvedEndpoints, err := getResolvedEndpoints(endpoints) - if err != nil { - return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve. - } - newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A) - rrset, err = rrsets.Add(newRrset) - if err != nil { - return err - } } } else { // the rrset already exists, so make it right. @@ -191,6 +193,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin // Need an appropriate CNAME record. Check that we have it. newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME) if rrset == newRrset { + if !endpointReachable { + if err = rrsets.Remove(rrset); err != nil { + return err + } + } // The existing rrset is equal to the required one - our work is done here return nil } else { @@ -199,7 +206,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin if err = rrsets.Remove(rrset); err != nil { return err } - if uplevelCname != "" { + if uplevelCname != "" && endpointReachable { if _, err = rrsets.Add(newRrset); err != nil { return err } @@ -214,6 +221,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin } newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A) if rrset == newRrset { + if !endpointReachable { + if err = rrsets.Remove(rrset); err != nil { + return err + } + } // The existing rrset is equal to the required one - our work is done here // TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the // worst case we'll just replace what's there with an equivalent, if not exactly identical record set. @@ -224,8 +236,10 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin if err = rrsets.Remove(rrset); err != nil { return err } - if _, err = rrsets.Add(newRrset); err != nil { - return err + if endpointReachable { + if _, err = rrsets.Add(newRrset); err != nil { + return err + } } } } @@ -279,7 +293,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * if err != nil { return err } - + _, endpointReachable := cachedService.endpointMap[clusterName] commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc" // dnsNames is the path up the DNS search tree, starting at the leaf dnsNames := []string{ @@ -292,7 +306,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints} for i, endpoint := range endpoints { - if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil { + if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1], endpointReachable); err != nil { return err } } diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 96ecf08375d..b4773a974d3 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -93,7 +93,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache } func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error { - glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) + glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) var err error cachedService.rwlock.Lock() defer cachedService.rwlock.Unlock() @@ -102,10 +102,10 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi // need to query dns info from dnsprovider and make sure of if deletion is needed if ok { // endpoints lost, clean dns record - glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) + glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) + delete(cachedService.endpointMap, clusterName) for i := 0; i < clientRetryCount; i++ { if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil { - delete(cachedService.endpointMap, clusterName) return nil } glog.V(4).Infof("Error ensuring DNS Records: %v", err) diff --git a/federation/pkg/federation-controller/service/endpoint_helper_test.go b/federation/pkg/federation-controller/service/endpoint_helper_test.go index 9815f19b5d3..c9421d819ba 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper_test.go +++ b/federation/pkg/federation-controller/service/endpoint_helper_test.go @@ -29,9 +29,11 @@ var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported var fakeDnsZones, _ = fakeDns.Zones() var fakeServiceController = ServiceController{ - dns: fakeDns, - dnsZones: fakeDnsZones, - serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, + dns: fakeDns, + dnsZones: fakeDnsZones, + federationName: "fed1", + zoneName: "example.com", + serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, clusterCache: &clusterClientCache{ clientMap: make(map[string]*clusterCache), }, @@ -52,8 +54,18 @@ func buildEndpoint(subsets [][]string) *v1.Endpoints { } func TestProcessEndpointUpdate(t *testing.T) { + clusterName := "foo" cc := clusterClientCache{ - clientMap: make(map[string]*clusterCache), + clientMap: map[string]*clusterCache{ + clusterName: { + cluster: &v1alpha1.Cluster{ + Status: v1alpha1.ClusterStatus{ + Zones: []string{"foozone"}, + Region: "fooregion", + }, + }, + }, + }, } tests := []struct { name string @@ -69,7 +81,7 @@ func TestProcessEndpointUpdate(t *testing.T) { endpointMap: make(map[string]int), }, buildEndpoint([][]string{{"ip1", ""}}), - "foo", + clusterName, 1, }, { @@ -81,11 +93,11 @@ func TestProcessEndpointUpdate(t *testing.T) { }, }, buildEndpoint([][]string{{"ip1", ""}}), - "foo", + clusterName, 1, }, } - + fakeServiceController.clusterCache = &cc for _, test := range tests { cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController) if test.expectResult != test.cachedService.endpointMap[test.clusterName] { diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 40ee628f6f5..27fc5faa93e 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -44,7 +44,7 @@ func (sc *ServiceController) clusterServiceWorker() { if quit { return } - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) if err != nil { glog.Errorf("Failed to sync service: %+v", err) } @@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() { } // Whenever there is change on service, the federation service should be updated -func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error { +func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, sc *ServiceController) error { // obj holds the latest service info from apiserver, return if there is no federation cache for the service cachedService, ok := serviceCache.get(key) if !ok { @@ -88,6 +88,15 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache } if needUpdate { + for i := 0; i < clientRetryCount; i++ { + if err := sc.ensureDnsRecords(clusterName, cachedService); err == nil { + break + } + glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err) + time.Sleep(cachedService.nextDNSUpdateDelay()) + clusterCache.serviceQueue.Add(key) + // did not retry here as we still want to persist federation apiserver even ensure dns records fails + } err := cc.persistFedServiceUpdate(cachedService, fedClient) if err == nil { cachedService.appliedState = cachedService.lastState @@ -219,7 +228,13 @@ func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedServi glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name) var err error for i := 0; i < clientRetryCount; i++ { - _, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service) + _, err := fedClient.Core().Services(service.Namespace).Get(service.Name) + if errors.IsNotFound(err) { + glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", + service.Namespace, service.Name, err) + return nil + } + _, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service) if err == nil { glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name) return nil diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 48882094f1e..3320177f8f1 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -360,6 +360,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{}) if err == nil || errors.IsNotFound(err) { glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName) + delete(cachedService.endpointMap, clusterName) return nil } time.Sleep(cachedService.nextRetryDelay())