Integrate federated service DNS record management

This commit is contained in:
Quinton Hoole 2016-05-20 13:21:33 -07:00
parent 06f742a5ea
commit 65e1fecab5
7 changed files with 364 additions and 67 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns/internal"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns/internal/stubs"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)
@ -99,3 +100,17 @@ func CreateInterface(projectID string, tokenSource oauth2.TokenSource) (*Interfa
glog.Infof("Successfully got DNS service: %v\n", service)
return newInterfaceWithStub(projectID, internal.NewService(service)), nil
}
// NewFakeInterface returns a fake clouddns interface, useful for unit testing purposes.
func NewFakeInterface() (dnsprovider.Interface, error) {
service := stubs.NewService()
interface_ := newInterfaceWithStub("", service)
zones := service.ManagedZones_
// Add a fake zone to test against.
zone := &stubs.ManagedZone{zones, "example.com", []stubs.ResourceRecordSet{}}
call := zones.Create(interface_.project(), zone)
if _, err := call.Do(); err != nil {
return nil, err
}
return interface_, nil
}

View File

@ -23,7 +23,6 @@ import (
"testing"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns/internal/stubs"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
)
@ -31,21 +30,7 @@ func newTestInterface() (dnsprovider.Interface, error) {
// Use this to test the real cloud service - insert appropriate project-id. Default token source will be used. See
// https://github.com/golang/oauth2/blob/master/google/default.go for details.
// i, err := dnsprovider.GetDnsProvider(ProviderName, strings.NewReader("\n[global]\nproject-id = federation0-cluster00"))
return newFakeInterface() // Use this to stub out the entire cloud service
}
func newFakeInterface() (dnsprovider.Interface, error) {
service := stubs.NewService()
interface_ := newInterfaceWithStub("", service)
zones := service.ManagedZones_
// Add a fake zone to test against.
zone := &stubs.ManagedZone{zones, "example.com", []stubs.ResourceRecordSet{}}
call := zones.Create(interface_.project(), zone)
_, err := call.Do()
if err != nil {
return nil, err
}
return interface_, nil
return NewFakeInterface() // Use this to stub out the entire cloud service
}
var interface_ dnsprovider.Interface

View File

@ -16,25 +16,280 @@ limitations under the License.
package service
// getClusterZoneName returns the name of the zone where the specified cluster exists (e.g. "us-east1-c" on GCE, or "us-east-1b" on AWS)
func getClusterZoneName(clusterName string) string {
// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet.
return "zone-of-cluster-" + clusterName
import (
"fmt"
"net"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
)
const (
// minDnsTtl is the minimum safe DNS TTL value to use (in seconds). We use this as the TTL for all DNS records.
minDnsTtl = 180
)
// getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error)
func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedService *cachedService) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
var (
zoneNames []string
regionName string
)
if zoneNames, regionName, err = s.getClusterZoneNames(clusterName); err != nil {
return nil, nil, nil, err
}
for lbClusterName, lbStatus := range cachedService.serviceStatusMap {
lbZoneNames, lbRegionName, err := s.getClusterZoneNames(lbClusterName)
if err != nil {
return nil, nil, nil, err
}
for _, ingress := range lbStatus.Ingress {
var address string
// We should get either an IP address or a hostname - use whichever one we get
if ingress.IP != "" {
address = ingress.IP
} else if ingress.Hostname != "" {
address = ingress.Hostname
}
if len(address) <= 0 {
return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.",
cachedService.lastState.Name, cachedService.lastState.Namespace, clusterName)
}
for _, lbZoneName := range lbZoneNames {
for _, zoneName := range zoneNames {
if lbZoneName == zoneName {
zoneEndpoints = append(zoneEndpoints, address)
}
}
}
if lbRegionName == regionName {
regionEndpoints = append(regionEndpoints, address)
}
globalEndpoints = append(globalEndpoints, address)
}
}
return zoneEndpoints, regionEndpoints, globalEndpoints, nil
}
// getClusterRegionName returns the name of the region where the specified cluster exists (e.g. us-east1 on GCE, or "us-east-1" on AWS)
func getClusterRegionName(clusterName string) string {
// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet.
return "region-of-cluster-" + clusterName
// getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS)
func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []string, region string, err error) {
client, ok := s.clusterCache.clientMap[clusterName]
if !ok {
return nil, "", fmt.Errorf("Cluster cache does not contain entry for cluster %s", clusterName)
}
if client.cluster == nil {
return nil, "", fmt.Errorf("Cluster cache entry for cluster %s is nil", clusterName)
}
return client.cluster.Status.Zones, client.cluster.Status.Region, nil
}
// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation
func getFederationDNSZoneName() string {
return "mydomain.com" // TODO: quinton: Get this from the federation configuration.
func (s *ServiceController) getFederationDNSZoneName() (string, error) {
return "example.com", nil // TODO: quinton: Get this from the federation configuration.
// Note: For unit testing this must match the domain populated in the test/stub dnsprovider.
}
func ensureDNSRecords(clusterName string, cachedService *cachedService) error {
// Quinton: Pseudocode....
// getDnsZone is a hack around the fact that dnsprovider does not yet support a Get() method, only a List() method. TODO: Fix that.
func getDnsZone(dnsZoneName string, dnsZonesInterface dnsprovider.Zones) (dnsprovider.Zone, error) {
dnsZones, err := dnsZonesInterface.List()
if err != nil {
return nil, err
}
for _, dnsZone := range dnsZones {
if dnsZone.Name() == dnsZoneName {
return dnsZone, nil
}
}
return nil, fmt.Errorf("DNS zone %s not found.", dnsZoneName)
}
/* getRrset is a hack around the fact that dnsprovider.ResourceRecordSets interface does not yet include a Get() method, only a List() method. TODO: Fix that.
Note that if the named resource record set does not exist, but no error occurred, the returned set, and error, are both nil
*/
func getRrset(dnsName string, rrsetsInterface dnsprovider.ResourceRecordSets) (dnsprovider.ResourceRecordSet, error) {
var returnVal dnsprovider.ResourceRecordSet
rrsets, err := rrsetsInterface.List()
if err != nil {
return nil, err
}
for _, rrset := range rrsets {
if rrset.Name() == dnsName {
returnVal = rrset
break
}
}
return returnVal, nil
}
/* getResolvedEndpoints perfoms DNS resolution on the provided slice of endpoints (which might be DNS names or IPv4 addresses)
and returns a list of IPv4 addresses. If any of the endpoints are neither valid IPv4 addresses nor resolvable DNS names,
non-nil error is also returned (possibly along with a partially complete list of resolved endpoints.
*/
func getResolvedEndpoints(endpoints []string) ([]string, error) {
resolvedEndpoints := make([]string, 0, len(endpoints))
for _, endpoint := range endpoints {
if net.ParseIP(endpoint) == nil {
// It's not a valid IP address, so assume it's a DNS name, and try to resolve it,
// replacing it's DNS name with it's IP addresses in expandedEndpoints
ipAddrs, err := net.LookupHost(endpoint)
if err != nil {
return resolvedEndpoints, err
}
resolvedEndpoints = append(resolvedEndpoints, ipAddrs...)
} else {
resolvedEndpoints = append(resolvedEndpoints, endpoint)
}
}
return resolvedEndpoints, nil
}
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
*/
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error {
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
if err != nil {
return err
}
rrsets, supported := dnsZone.ResourceRecordSets()
if !supported {
return fmt.Errorf("Failed to ensure DNS records for %s. DNS provider does not support the ResourceRecordSets interface.", dnsName)
}
rrset, err := getRrset(dnsName, rrsets) // TODO: rrsets.Get(dnsName)
if err != nil {
return err
}
if rrset == nil {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
} else {
// the rrset already exists, so make it right.
if len(endpoints) < 1 {
// Need an appropriate CNAME record. Check that we have it.
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if rrset == newRrset {
// The existing rrset is equal to the required one - our work is done here
return nil
} else {
// Need to replace the existing one with a better one (or just remove it if we have no healthy endpoints).
// TODO: Ideally do these inside a transaction, or do an atomic update, but dnsprovider interface doesn't support that yet.
if err = rrsets.Remove(rrset); err != nil {
return err
}
if uplevelCname != "" {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
}
}
} else {
// We have an rrset in DNS, possibly with some missing addresses and some unwanted addresses.
// And we have healthy endpoints. Just replace what's there with the healthy endpoints, if it's not already correct.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil { // Some invalid addresses or otherwise unresolvable DNS names.
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
if rrset == newRrset {
// The existing rrset is equal to the required one - our work is done here
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
return nil
} else {
// Need to replace the existing one with a better one
// TODO: Ideally do these inside a transaction, or do an atomic update, but dnsprovider interface doesn't support that yet.
if err = rrsets.Remove(rrset); err != nil {
return err
}
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
}
}
}
return nil
}
/* ensureDnsRecords ensures (idempotently, and with minimum mutations) that all of the DNS records for a service in a given cluster are correct, given the current state of that service in that cluster. This should be called every time the state of a service might have changed (either w.r.t. it's loadblancer address, or if the number of healthy backend endpoints for that service transitioned from zero to non-zero (or vice verse). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist.
*/
func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *cachedService) error {
// Quinton: Pseudocode....
// See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648
// For each service we need the following DNS names:
// mysvc.myns.myfed.svc.z1.r1.mydomain.com (for zone z1 in region r1)
// - an A record to IP address of specific shard in that zone (if that shard exists and has healthy endpoints)
// - OR a CNAME record to the next level up, i.e. mysvc.myns.myfed.svc.r1.mydomain.com (if a healthy shard does not exist in zone z1)
// mysvc.myns.myfed.svc.r1.federation
// - a set of A records to IP addresses of all healthy shards in region r1, if one or more of these exist
// - OR a CNAME record to the next level up, i.e. mysvc.myns.myfed.svc.mydomain.com (if no healthy shards exist in region r1)
// mysvc.myns.myfed.svc.federation
// - a set of A records to IP addresses of all healthy shards in all regions, if one or more of these exist.
// - no record (NXRECORD response) if no healthy shards exist in any regions)
//
// For each cached service, cachedService.lastState tracks the current known state of the service, while cachedService.appliedState contains
// the state of the service when we last successfully sync'd it's DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records.
//
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
serviceName := cachedService.lastState.Name
namespaceName := cachedService.lastState.Namespace
zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
if err != nil {
return err
}
dnsZoneName, err := s.getFederationDNSZoneName()
if err != nil {
return err
}
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, cachedService)
if err != nil {
return err
}
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
// dnsNames is the path up the DNS search tree, starting at the leaf
dnsNames := []string{
commonPrefix + "." + zoneNames[0] + "." + regionName + "." + dnsZoneName, // zone level - TODO might need other zone names for multi-zone clusters
commonPrefix + "." + regionName + "." + dnsZoneName, // region level, one up from zone level
commonPrefix + "." + dnsZoneName, // global level, one up from region level
"", // nowhere to go up from global level
}
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
for i, endpoint := range endpoints {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
return err
}
}
return nil
}

View File

@ -42,7 +42,7 @@ func (sc *ServiceController) clusterEndpointWorker() {
return
}
defer cache.endpointQueue.Done(key)
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient)
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
if err != nil {
glog.V(2).Infof("Failed to sync endpoint: %+v", err)
}
@ -54,7 +54,7 @@ func (sc *ServiceController) clusterEndpointWorker() {
// Whenever there is change on endpoint, the federation service should be updated
// key is the namespaced name of endpoint
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error {
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface, serviceController *ServiceController) error {
cachedService, ok := serviceCache.get(key)
if !ok {
// here we filtered all non-federation services
@ -70,19 +70,19 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
endpoint, ok := endpointInterface.(*api.Endpoints)
if ok {
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
} else {
_, ok := endpointInterface.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface)
}
glog.Infof("Found tombstone for %v", key)
err = cc.processEndpointDeletion(cachedService, clusterName)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
}
} else {
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
err = cc.processEndpointDeletion(cachedService, clusterName)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
}
if err != nil {
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
@ -92,7 +92,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
return nil
}
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string) error {
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
var err error
cachedService.rwlock.Lock()
@ -103,13 +103,12 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
if ok {
// endpoints lost, clean dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
// TODO: need to integrate with dns.go:ensureDNSRecords
for i := 0; i < clientRetryCount; i++ {
err := ensureDNSRecords(clusterName, cachedService)
if err == nil {
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
delete(cachedService.endpointMap, clusterName)
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
time.Sleep(cachedService.nextDNSUpdateDelay())
}
}
@ -118,7 +117,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// Update dns info when endpoint update event received
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string) error {
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
@ -132,12 +131,11 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService
// first time get endpoints, update dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
err := ensureDNSRecords(clusterName, cachedService)
if err != nil {
// TODO: need to integrate with dns.go:ensureDNSRecords
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := ensureDNSRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}

View File

@ -19,9 +19,25 @@ package service
import (
"testing"
"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/sets"
)
var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
var fakeDnsZones, _ = fakeDns.Zones()
var fakeServiceController = ServiceController{
dns: fakeDns,
dnsZones: fakeDnsZones,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
clientMap: make(map[string]*clusterCache),
},
knownClusterSet: make(sets.String),
}
func buildEndpoint(subsets [][]string) *api.Endpoints {
endpoint := &api.Endpoints{
Subsets: []api.EndpointSubset{
@ -69,8 +85,9 @@ func TestProcessEndpointUpdate(t *testing.T) {
1,
},
}
for _, test := range tests {
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName)
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
}
@ -78,8 +95,18 @@ func TestProcessEndpointUpdate(t *testing.T) {
}
func TestProcessEndpointDeletion(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &federation.Cluster{
Status: federation.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
tests := []struct {
name string
@ -95,7 +122,7 @@ func TestProcessEndpointDeletion(t *testing.T) {
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
0,
},
{
@ -103,16 +130,17 @@ func TestProcessEndpointDeletion(t *testing.T) {
&cachedService{
lastState: &api.Service{},
endpointMap: map[string]int{
"foo": 1,
clusterName: 1,
},
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
0,
},
}
fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointDeletion(test.cachedService, test.clusterName)
cc.processEndpointDeletion(test.cachedService, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
}

View File

@ -78,14 +78,10 @@ type cachedService struct {
// key clusterName
// value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address
endpointMap map[string]int
// cluster service map hold serivice status info from kubernetes clusters
// key clusterName
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
serviceStatusMap map[string]api.LoadBalancerStatus
// Ensures only one goroutine can operate on this service at any given time.
rwlock sync.Mutex
// Controls error back-off for procceeding federation service to k8s clusters
lastRetryDelay time.Duration
// Controls error back-off for updating federation service back to federation apiserver
@ -104,9 +100,11 @@ type serviceCache struct {
type ServiceController struct {
dns dnsprovider.Interface
federationClient federationclientset.Interface
zones []dnsprovider.Zone
serviceCache *serviceCache
clusterCache *clusterClientCache
federationName string
// each federation should be configured with a single zone (e.g. "mycompany.com")
dnsZones dnsprovider.Zones
serviceCache *serviceCache
clusterCache *clusterClientCache
// A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister
// Watches changes to all services
@ -690,7 +688,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust
for key := range s.clusterCache.clientMap {
for _, clusterName := range clusterNames {
if key == clusterName {
ensureDNSRecords(clusterName, service)
s.ensureDnsRecords(clusterName, service)
}
}
}

View File

@ -17,22 +17,38 @@ limitations under the License.
package service
import (
"sync"
"testing"
"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/sets"
)
func TestGetClusterConditionPredicate(t *testing.T) {
fakedns, _ := clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
serviceController := ServiceController{
dns: fakedns,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
rwlock: sync.Mutex{},
clientMap: make(map[string]*clusterCache),
},
knownClusterSet: make(sets.String),
}
tests := []struct {
cluster federation.Cluster
expectAccept bool
name string
cluster federation.Cluster
expectAccept bool
name string
serviceController *ServiceController
}{
{
cluster: federation.Cluster{},
expectAccept: false,
name: "empty",
cluster: federation.Cluster{},
expectAccept: false,
name: "empty",
serviceController: &serviceController,
},
{
cluster: federation.Cluster{
@ -42,8 +58,9 @@ func TestGetClusterConditionPredicate(t *testing.T) {
},
},
},
expectAccept: true,
name: "basic",
expectAccept: true,
name: "basic",
serviceController: &serviceController,
},
{
cluster: federation.Cluster{
@ -53,8 +70,9 @@ func TestGetClusterConditionPredicate(t *testing.T) {
},
},
},
expectAccept: false,
name: "notready",
expectAccept: false,
name: "notready",
serviceController: &serviceController,
},
}
pred := getClusterConditionPredicate()