mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #45034 from shashidharatd/federation-service-controller-3
Automatic merge from submit-queue (batch tested with PRs 45247, 45810, 45034, 45898, 45899) [Federation] Segregate DNS related code to separate controller **What this PR does / why we need it**: This is the continuation of service controller re-factor work as outlined in #41253 This PR segregates DNS related code from service controller to another controller `service-dns controller` which manages the DNS records on the configured DNS provider. `service-dns controller` monitors the federated services for the ingress annotations and create/update/delete DNS records accordingly. `service-dns controller` can be optionally disabled and DNS record management could be done by third party components by monitoring the ingress annotations on federated services. (This would enable something like federation middleware for CoreDNS where federation api server could be used as a backend to CoreDNS eliminating the need for etcd storage.) **Special notes for your reviewer**: **Release note**: ``` Federation: A new controller for managing DNS records is introduced which can be optionally disabled to enable third party components to manage DNS records for federated services. ``` cc @kubernetes/sig-federation-pr-reviews
This commit is contained in:
commit
b8f084a6c5
@ -18,7 +18,6 @@ go_library(
|
||||
deps = [
|
||||
"//federation/client/clientset_generated/federation_clientset:go_default_library",
|
||||
"//federation/cmd/federation-controller-manager/app/options:go_default_library",
|
||||
"//federation/pkg/dnsprovider:go_default_library",
|
||||
"//federation/pkg/dnsprovider/providers/aws/route53:go_default_library",
|
||||
"//federation/pkg/dnsprovider/providers/coredns:go_default_library",
|
||||
"//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library",
|
||||
|
@ -35,7 +35,6 @@ import (
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
||||
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||
"k8s.io/kubernetes/federation/pkg/federatedtypes"
|
||||
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
|
||||
deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment"
|
||||
@ -137,17 +136,20 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
|
||||
clustercontroller.StartClusterController(restClientCfg, stopChan, s.ClusterMonitorPeriod.Duration)
|
||||
|
||||
if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) {
|
||||
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
|
||||
if err != nil {
|
||||
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||
if controllerEnabled(s.Controllers, serverResources, servicecontroller.DNSControllerName, servicecontroller.RequiredResources, true) {
|
||||
serviceDNScontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.DNSUserAgentName))
|
||||
serviceDNSController, err := servicecontroller.NewServiceDNSController(serviceDNScontrollerClientset, s.DnsProvider, s.DnsConfigFile, s.FederationName, s.ServiceDnsSuffix, s.ZoneName, s.ZoneID)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to start service dns controller: %v", err)
|
||||
} else {
|
||||
go serviceDNSController.DNSControllerRun(s.ConcurrentServiceSyncs, wait.NeverStop)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Loading client config for service controller %q", servicecontroller.UserAgentName)
|
||||
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
|
||||
servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ServiceDnsSuffix, s.ZoneName, s.ZoneID)
|
||||
glog.V(3).Infof("Running service controller")
|
||||
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
|
||||
glog.Fatalf("Failed to start service controller: %v", err)
|
||||
}
|
||||
serviceController := servicecontroller.New(scClientset)
|
||||
go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop)
|
||||
}
|
||||
|
||||
if controllerEnabled(s.Controllers, serverResources, namespacecontroller.ControllerName, namespacecontroller.RequiredResources, true) {
|
||||
|
@ -19,7 +19,6 @@ go_library(
|
||||
deps = [
|
||||
"//federation/apis/federation:go_default_library",
|
||||
"//federation/apis/federation/v1beta1:go_default_library",
|
||||
"//federation/client/cache:go_default_library",
|
||||
"//federation/client/clientset_generated/federation_clientset:go_default_library",
|
||||
"//federation/pkg/dnsprovider:go_default_library",
|
||||
"//federation/pkg/dnsprovider/rrstype:go_default_library",
|
||||
|
@ -19,24 +19,210 @@ package service
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"strings"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
// minDnsTtl is the minimum safe DNS TTL value to use (in seconds). We use this as the TTL for all DNS records.
|
||||
minDnsTtl = 180
|
||||
DNSControllerName = "service-dns"
|
||||
|
||||
DNSUserAgentName = "federation-service-dns-controller"
|
||||
|
||||
// minDNSTTL is the minimum safe DNS TTL value to use (in seconds). We use this as the TTL for all DNS records.
|
||||
minDNSTTL = 180
|
||||
)
|
||||
|
||||
type ServiceDNSController struct {
|
||||
// Client to federation api server
|
||||
federationClient fedclientset.Interface
|
||||
dns dnsprovider.Interface
|
||||
federationName string
|
||||
// serviceDNSSuffix is the DNS suffix we use when publishing service DNS names
|
||||
serviceDNSSuffix string
|
||||
// zoneName and zoneID are used to identify the zone in which to put records
|
||||
zoneName string
|
||||
zoneID string
|
||||
dnsZones dnsprovider.Zones
|
||||
// each federation should be configured with a single zone (e.g. "mycompany.com")
|
||||
dnsZone dnsprovider.Zone
|
||||
// Informer Store for federated services
|
||||
serviceStore corelisters.ServiceLister
|
||||
// Informer controller for federated services
|
||||
serviceController cache.Controller
|
||||
workQueue workqueue.Interface
|
||||
}
|
||||
|
||||
// NewServiceDNSController returns a new service dns controller to manage DNS records for federated services
|
||||
func NewServiceDNSController(client fedclientset.Interface, dnsProvider, dnsProviderConfig, federationName,
|
||||
serviceDNSSuffix, zoneName, zoneID string) (*ServiceDNSController, error) {
|
||||
dns, err := dnsprovider.InitDnsProvider(dnsProvider, dnsProviderConfig)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("DNS provider could not be initialized: %v", err))
|
||||
return nil, err
|
||||
}
|
||||
d := &ServiceDNSController{
|
||||
federationClient: client,
|
||||
dns: dns,
|
||||
federationName: federationName,
|
||||
serviceDNSSuffix: serviceDNSSuffix,
|
||||
zoneName: zoneName,
|
||||
zoneID: zoneID,
|
||||
workQueue: workqueue.New(),
|
||||
}
|
||||
if err := d.validateConfig(); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Invalid configuration passed to DNS provider: %v", err))
|
||||
return nil, err
|
||||
}
|
||||
if err := d.retrieveOrCreateDNSZone(); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to retrieve DNS zone: %v", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start informer in federated API servers on federated services
|
||||
var serviceIndexer cache.Indexer
|
||||
serviceIndexer, d.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
|
||||
return client.Core().Services(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return client.Core().Services(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.Service{},
|
||||
serviceSyncPeriod,
|
||||
util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { d.workQueue.Add(obj) }),
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
d.serviceStore = corelisters.NewServiceLister(serviceIndexer)
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (s *ServiceDNSController) DNSControllerRun(workers int, stopCh <-chan struct{}) {
|
||||
defer runtime.HandleCrash()
|
||||
defer s.workQueue.ShutDown()
|
||||
|
||||
glog.Infof("Starting federation service dns controller")
|
||||
defer glog.Infof("Stopping federation service dns controller")
|
||||
|
||||
go s.serviceController.Run(stopCh)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(s.worker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func wantsDNSRecords(service *v1.Service) bool {
|
||||
return service.Spec.Type == v1.ServiceTypeLoadBalancer
|
||||
}
|
||||
|
||||
func (s *ServiceDNSController) workerFunction() bool {
|
||||
item, quit := s.workQueue.Get()
|
||||
if quit {
|
||||
return true
|
||||
}
|
||||
defer s.workQueue.Done(item)
|
||||
|
||||
service := item.(*v1.Service)
|
||||
|
||||
if !wantsDNSRecords(service) {
|
||||
return false
|
||||
}
|
||||
|
||||
ingress, err := ParseFederatedServiceIngress(service)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Error in parsing lb ingress for service %s/%s: %v", service.Namespace, service.Name, err))
|
||||
return false
|
||||
}
|
||||
for _, clusterIngress := range ingress.Items {
|
||||
s.ensureDNSRecords(clusterIngress.Cluster, service)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *ServiceDNSController) worker() {
|
||||
for {
|
||||
if quit := s.workerFunction(); quit {
|
||||
glog.Infof("service dns controller worker queue shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ServiceDNSController) validateConfig() error {
|
||||
if s.federationName == "" {
|
||||
return fmt.Errorf("DNSController should not be run without federationName")
|
||||
}
|
||||
if s.zoneName == "" && s.zoneID == "" {
|
||||
return fmt.Errorf("DNSController must be run with either zoneName or zoneID")
|
||||
}
|
||||
if s.serviceDNSSuffix == "" {
|
||||
if s.zoneName == "" {
|
||||
return fmt.Errorf("DNSController must be run with zoneName, if serviceDnsSuffix is not set")
|
||||
}
|
||||
s.serviceDNSSuffix = s.zoneName
|
||||
}
|
||||
if s.dns == nil {
|
||||
return fmt.Errorf("DNSController should not be run without a dnsprovider")
|
||||
}
|
||||
zones, ok := s.dns.Zones()
|
||||
if !ok {
|
||||
return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records")
|
||||
}
|
||||
s.dnsZones = zones
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ServiceDNSController) retrieveOrCreateDNSZone() error {
|
||||
matchingZones, err := getDNSZones(s.zoneName, s.zoneID, s.dnsZones)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error querying for DNS zones: %v", err)
|
||||
}
|
||||
switch len(matchingZones) {
|
||||
case 0: // No matching zones for s.zoneName, so create one
|
||||
if s.zoneName == "" {
|
||||
return fmt.Errorf("DNSController must be run with zoneName to create zone automatically")
|
||||
}
|
||||
glog.Infof("DNS zone %q not found. Creating DNS zone %q.", s.zoneName, s.zoneName)
|
||||
managedZone, err := s.dnsZones.New(s.zoneName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
zone, err := s.dnsZones.Add(managedZone)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Infof("DNS zone %q successfully created. Note that DNS resolution will not work until you have registered this name with "+
|
||||
"a DNS registrar and they have changed the authoritative name servers for your domain to point to your DNS provider", zone.Name())
|
||||
case 1: // s.zoneName matches exactly one DNS zone
|
||||
s.dnsZone = matchingZones[0]
|
||||
default: // s.zoneName matches more than one DNS zone
|
||||
return fmt.Errorf("Multiple matching DNS zones found for %q; please specify zoneID", s.zoneName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error)
|
||||
func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
|
||||
func (s *ServiceDNSController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
|
||||
var (
|
||||
zoneNames []string
|
||||
regionName string
|
||||
@ -70,7 +256,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.
|
||||
address = ingress.Hostname
|
||||
}
|
||||
if len(address) <= 0 {
|
||||
return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.",
|
||||
return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service",
|
||||
service.Name, service.Namespace, clusterName)
|
||||
}
|
||||
for _, lbZoneName := range lbZoneNames {
|
||||
@ -90,7 +276,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.
|
||||
}
|
||||
|
||||
// getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS)
|
||||
func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) {
|
||||
func (s *ServiceDNSController) getClusterZoneNames(clusterName string) ([]string, string, error) {
|
||||
cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
@ -98,13 +284,8 @@ func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, s
|
||||
return cluster.Status.Zones, cluster.Status.Region, nil
|
||||
}
|
||||
|
||||
// getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records
|
||||
func (s *ServiceController) getServiceDnsSuffix() (string, error) {
|
||||
return s.serviceDnsSuffix, nil
|
||||
}
|
||||
|
||||
// getDnsZones returns the DNS zones matching dnsZoneName and dnsZoneID (if specified)
|
||||
func getDnsZones(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprovider.Zones) ([]dnsprovider.Zone, error) {
|
||||
// getDNSZones returns the DNS zones matching dnsZoneName and dnsZoneID (if specified)
|
||||
func getDNSZones(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprovider.Zones) ([]dnsprovider.Zone, error) {
|
||||
// TODO: We need query-by-name and query-by-id functions
|
||||
dnsZones, err := dnsZonesInterface.List()
|
||||
if err != nil {
|
||||
@ -130,30 +311,6 @@ func getDnsZones(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprov
|
||||
return matches, nil
|
||||
}
|
||||
|
||||
// getDnsZone returns the DNS zone, as identified by dnsZoneName and dnsZoneID
|
||||
// This is similar to getDnsZones, but returns an error if there are zero or multiple matching zones.
|
||||
func getDnsZone(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprovider.Zones) (dnsprovider.Zone, error) {
|
||||
dnsZones, err := getDnsZones(dnsZoneName, dnsZoneID, dnsZonesInterface)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(dnsZones) == 1 {
|
||||
return dnsZones[0], nil
|
||||
}
|
||||
|
||||
name := dnsZoneName
|
||||
if dnsZoneID != "" {
|
||||
name += "/" + dnsZoneID
|
||||
}
|
||||
|
||||
if len(dnsZones) == 0 {
|
||||
return nil, fmt.Errorf("DNS zone %s not found", name)
|
||||
} else {
|
||||
return nil, fmt.Errorf("DNS zone %s is ambiguous (please specify zoneID)", name)
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: that if the named resource record set does not exist, but no
|
||||
// error occurred, the returned list will be empty, and the error will
|
||||
// be nil
|
||||
@ -193,13 +350,13 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
|
||||
return resolvedEndpoints, nil
|
||||
}
|
||||
|
||||
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
|
||||
/* ensureDNSRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
|
||||
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
|
||||
*/
|
||||
func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName string, endpoints []string, uplevelCname string) error {
|
||||
func (s *ServiceDNSController) ensureDNSRrsets(dnsZone dnsprovider.Zone, dnsName string, endpoints []string, uplevelCname string) error {
|
||||
rrsets, supported := dnsZone.ResourceRecordSets()
|
||||
if !supported {
|
||||
return fmt.Errorf("Failed to ensure DNS records for %s. DNS provider does not support the ResourceRecordSets interface.", dnsName)
|
||||
return fmt.Errorf("Failed to ensure DNS records for %s. DNS provider does not support the ResourceRecordSets interface", dnsName)
|
||||
}
|
||||
rrsetList, err := getRrset(dnsName, rrsets) // TODO: rrsets.Get(dnsName)
|
||||
if err != nil {
|
||||
@ -211,7 +368,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
|
||||
glog.V(4).Infof("There are no healthy endpoint addresses at level %q, so CNAME to %q, if provided", dnsName, uplevelCname)
|
||||
if uplevelCname != "" {
|
||||
glog.V(4).Infof("Creating CNAME to %q for %q", uplevelCname, dnsName)
|
||||
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
|
||||
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDNSTTL, rrstype.CNAME)
|
||||
glog.V(4).Infof("Adding recordset %v", newRrset)
|
||||
err = rrsets.StartChangeset().Add(newRrset).Apply()
|
||||
if err != nil {
|
||||
@ -230,7 +387,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
|
||||
if err != nil {
|
||||
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
|
||||
}
|
||||
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
|
||||
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDNSTTL, rrstype.A)
|
||||
glog.V(4).Infof("Adding recordset %v", newRrset)
|
||||
err = rrsets.StartChangeset().Add(newRrset).Apply()
|
||||
if err != nil {
|
||||
@ -243,7 +400,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
|
||||
glog.V(4).Infof("Recordset %v already exists. Ensuring that it is correct.", rrsetList)
|
||||
if len(endpoints) < 1 {
|
||||
// Need an appropriate CNAME record. Check that we have it.
|
||||
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
|
||||
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDNSTTL, rrstype.CNAME)
|
||||
glog.V(4).Infof("No healthy endpoints for %s. Have recordsets %v. Need recordset %v", dnsName, rrsetList, newRrset)
|
||||
found := findRrset(rrsetList, newRrset)
|
||||
if found != nil {
|
||||
@ -279,7 +436,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
|
||||
if err != nil { // Some invalid addresses or otherwise unresolvable DNS names.
|
||||
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
|
||||
}
|
||||
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
|
||||
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDNSTTL, rrstype.A)
|
||||
glog.V(4).Infof("Have recordset %v. Need recordset %v", rrsetList, newRrset)
|
||||
found := findRrset(rrsetList, newRrset)
|
||||
if found != nil {
|
||||
@ -305,13 +462,13 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
|
||||
return nil
|
||||
}
|
||||
|
||||
/* ensureDnsRecords ensures (idempotently, and with minimum mutations) that all of the DNS records for a service in a given cluster are correct,
|
||||
/* ensureDNSRecords ensures (idempotently, and with minimum mutations) that all of the DNS records for a service in a given cluster are correct,
|
||||
given the current state of that service in that cluster. This should be called every time the state of a service might have changed
|
||||
(either w.r.t. its loadbalancer address, or if the number of healthy backend endpoints for that service transitioned from zero to non-zero
|
||||
(or vice versa). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint
|
||||
are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist
|
||||
in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */
|
||||
func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error {
|
||||
func (s *ServiceDNSController) ensureDNSRecords(clusterName string, service *v1.Service) error {
|
||||
// Quinton: Pseudocode....
|
||||
// See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648
|
||||
// For each service we need the following DNS names:
|
||||
@ -329,15 +486,6 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Ser
|
||||
// So generate the DNS records based on the current state and ensure those desired DNS records match the
|
||||
// actual DNS records (add new records, remove deleted records, and update changed records).
|
||||
//
|
||||
if s == nil {
|
||||
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
|
||||
}
|
||||
if s.dns == nil {
|
||||
return nil
|
||||
}
|
||||
if service == nil {
|
||||
return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
|
||||
}
|
||||
serviceName := service.Name
|
||||
namespaceName := service.Namespace
|
||||
zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
|
||||
@ -347,10 +495,6 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Ser
|
||||
if zoneNames == nil {
|
||||
return fmt.Errorf("failed to get cluster zone names")
|
||||
}
|
||||
serviceDnsSuffix, err := s.getServiceDnsSuffix()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -358,21 +502,16 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Ser
|
||||
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
|
||||
// dnsNames is the path up the DNS search tree, starting at the leaf
|
||||
dnsNames := []string{
|
||||
commonPrefix + "." + zoneNames[0] + "." + regionName + "." + serviceDnsSuffix, // zone level - TODO might need other zone names for multi-zone clusters
|
||||
commonPrefix + "." + regionName + "." + serviceDnsSuffix, // region level, one up from zone level
|
||||
commonPrefix + "." + serviceDnsSuffix, // global level, one up from region level
|
||||
strings.Join([]string{commonPrefix, zoneNames[0], regionName, s.serviceDNSSuffix}, "."), // zone level - TODO might need other zone names for multi-zone clusters
|
||||
strings.Join([]string{commonPrefix, regionName, s.serviceDNSSuffix}, "."), // region level, one up from zone level
|
||||
strings.Join([]string{commonPrefix, s.serviceDNSSuffix}, "."), // global level, one up from region level
|
||||
"", // nowhere to go up from global level
|
||||
}
|
||||
|
||||
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
|
||||
|
||||
dnsZone, err := getDnsZone(s.zoneName, s.zoneID, s.dnsZones)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i, endpoint := range endpoints {
|
||||
if err = s.ensureDnsRrsets(dnsZone, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
|
||||
if err = s.ensureDNSRrsets(s.dnsZone, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -50,13 +50,11 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "ServiceWithSingleLBIngress",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
String()},
|
||||
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{}).
|
||||
String()},
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
@ -73,10 +71,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "withname",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{},
|
||||
},
|
||||
expected: []string{
|
||||
"example.com:"+globalDNSName+":A:180:[198.51.100.1]",
|
||||
@ -88,9 +83,11 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "ServiceWithNoLBIngress",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{}).
|
||||
AddEndpoints(cluster2Name, []string{}).
|
||||
String()},
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
@ -103,14 +100,11 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "ServiceWithMultipleLBIngress",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
String()},
|
||||
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
String()},
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
@ -124,14 +118,12 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "ServiceWithLBIngressAndServiceDeleted",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
String()},
|
||||
DeletionTimestamp: &metav1.Time{Time: time.Now()},
|
||||
Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
String()},
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
@ -145,15 +137,12 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "ServiceWithMultipleLBIngressAndOneLBIngressGettingRemoved",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
RemoveEndpoint(cluster2Name, "198.51.200.1").
|
||||
String()},
|
||||
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
RemoveEndpoint(cluster2Name, "198.51.200.1").
|
||||
String()},
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
@ -167,16 +156,13 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
{
|
||||
name: "ServiceWithMultipleLBIngressAndAllLBIngressGettingRemoved",
|
||||
service: v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "servicename",
|
||||
Namespace: "servicenamespace",
|
||||
Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
RemoveEndpoint(cluster1Name, "198.51.100.1").
|
||||
RemoveEndpoint(cluster2Name, "198.51.200.1").
|
||||
String()},
|
||||
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
|
||||
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
|
||||
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
|
||||
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
|
||||
RemoveEndpoint(cluster1Name, "198.51.100.1").
|
||||
RemoveEndpoint(cluster2Name, "198.51.200.1").
|
||||
String()},
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
@ -195,22 +181,30 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
||||
}
|
||||
fakeClient := &fakefedclientset.Clientset{}
|
||||
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}})
|
||||
serviceController := ServiceController{
|
||||
d := ServiceDNSController{
|
||||
federationClient: fakeClient,
|
||||
dns: fakedns,
|
||||
dnsZones: fakednsZones,
|
||||
serviceDnsSuffix: "federation.example.com",
|
||||
serviceDNSSuffix: "federation.example.com",
|
||||
zoneName: "example.com",
|
||||
federationName: "myfederation",
|
||||
}
|
||||
|
||||
err := serviceController.ensureDnsRecords(cluster1Name, &test.service)
|
||||
dnsZones, err := getDNSZones(d.zoneName, d.zoneID, d.dnsZones)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
|
||||
t.Errorf("Test failed for %s, Get DNS Zones failed: %v", test.name, err)
|
||||
}
|
||||
err = serviceController.ensureDnsRecords(cluster2Name, &test.service)
|
||||
d.dnsZone = dnsZones[0]
|
||||
test.service.Name = "servicename"
|
||||
test.service.Namespace = "servicenamespace"
|
||||
|
||||
ingress, err := ParseFederatedServiceIngress(&test.service)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
|
||||
t.Errorf("Error in parsing lb ingress for service %s/%s: %v", test.service.Namespace, test.service.Name, err)
|
||||
return
|
||||
}
|
||||
for _, clusterIngress := range ingress.Items {
|
||||
d.ensureDNSRecords(clusterIngress.Cluster, &test.service)
|
||||
}
|
||||
|
||||
zones, err := fakednsZones.List()
|
||||
|
@ -40,10 +40,7 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
fedapi "k8s.io/kubernetes/federation/apis/federation"
|
||||
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
federationcache "k8s.io/kubernetes/federation/client/cache"
|
||||
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
@ -70,25 +67,12 @@ var (
|
||||
)
|
||||
|
||||
type ServiceController struct {
|
||||
dns dnsprovider.Interface
|
||||
federationClient fedclientset.Interface
|
||||
federationName string
|
||||
// serviceDnsSuffix is the DNS suffix we use when publishing service DNS names
|
||||
serviceDnsSuffix string
|
||||
// zoneName and zoneID are used to identify the zone in which to put records
|
||||
zoneName string
|
||||
zoneID string
|
||||
// each federation should be configured with a single zone (e.g. "mycompany.com")
|
||||
dnsZones dnsprovider.Zones
|
||||
// A store of services, populated by the serviceController
|
||||
serviceStore corelisters.ServiceLister
|
||||
// Watches changes to all services
|
||||
serviceController cache.Controller
|
||||
federatedInformer fedutil.FederatedInformer
|
||||
// A store of services, populated by the serviceController
|
||||
clusterStore federationcache.StoreToClusterLister
|
||||
// Watches changes to all services
|
||||
clusterController cache.Controller
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
// services that need to be synced
|
||||
@ -96,7 +80,7 @@ type ServiceController struct {
|
||||
|
||||
// For triggering all services reconciliation. This is used when
|
||||
// a new cluster becomes available.
|
||||
clusterDeliverer *util.DelayingDeliverer
|
||||
clusterDeliverer *fedutil.DelayingDeliverer
|
||||
|
||||
deletionHelper *deletionhelper.DeletionHelper
|
||||
|
||||
@ -106,26 +90,20 @@ type ServiceController struct {
|
||||
|
||||
endpointFederatedInformer fedutil.FederatedInformer
|
||||
federatedUpdater fedutil.FederatedUpdater
|
||||
objectDeliverer *util.DelayingDeliverer
|
||||
objectDeliverer *fedutil.DelayingDeliverer
|
||||
flowcontrolBackoff *flowcontrol.Backoff
|
||||
}
|
||||
|
||||
// New returns a new service controller to keep DNS provider service resources
|
||||
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
|
||||
func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
||||
federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
|
||||
func New(federationClient fedclientset.Interface) *ServiceController {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
// federationClient event is not supported yet
|
||||
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
|
||||
|
||||
s := &ServiceController{
|
||||
dns: dns,
|
||||
federationClient: federationClient,
|
||||
federationName: federationName,
|
||||
serviceDnsSuffix: serviceDnsSuffix,
|
||||
zoneName: zoneName,
|
||||
zoneID: zoneID,
|
||||
eventBroadcaster: broadcaster,
|
||||
eventRecorder: recorder,
|
||||
queue: workqueue.New(),
|
||||
@ -134,8 +112,8 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
||||
updateTimeout: updateTimeout,
|
||||
flowcontrolBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
|
||||
}
|
||||
s.objectDeliverer = util.NewDelayingDeliverer()
|
||||
s.clusterDeliverer = util.NewDelayingDeliverer()
|
||||
s.objectDeliverer = fedutil.NewDelayingDeliverer()
|
||||
s.clusterDeliverer = fedutil.NewDelayingDeliverer()
|
||||
var serviceIndexer cache.Indexer
|
||||
serviceIndexer, s.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
@ -148,7 +126,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
||||
},
|
||||
&v1.Service{},
|
||||
serviceSyncPeriod,
|
||||
util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) {
|
||||
fedutil.NewTriggerOnAllChanges(func(obj pkgruntime.Object) {
|
||||
glog.V(5).Infof("Delivering notification from federation: %v", obj)
|
||||
s.deliverObject(obj, 0, false)
|
||||
}),
|
||||
@ -175,7 +153,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
||||
controller.NoResyncPeriodFunc(),
|
||||
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
|
||||
// would be just confirmation that some service operation succeeded.
|
||||
util.NewTriggerOnAllChanges(
|
||||
fedutil.NewTriggerOnAllChanges(
|
||||
func(obj pkgruntime.Object) {
|
||||
glog.V(5).Infof("Delivering service notification from federated cluster %s: %v", cluster.Name, obj)
|
||||
s.deliverObject(obj, s.reviewDelay, false)
|
||||
@ -267,17 +245,16 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj
|
||||
|
||||
// It's an error to call Run() more than once for a given ServiceController
|
||||
// object.
|
||||
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
|
||||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) {
|
||||
glog.Infof("Starting federation service controller")
|
||||
|
||||
defer runtime.HandleCrash()
|
||||
s.federatedInformer.Start()
|
||||
s.endpointFederatedInformer.Start()
|
||||
s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
|
||||
s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
|
||||
s.queue.Add(item.Value.(string))
|
||||
})
|
||||
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
|
||||
s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
|
||||
s.deliverServicesOnClusterChange()
|
||||
})
|
||||
fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh)
|
||||
@ -295,55 +272,6 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
|
||||
s.objectDeliverer.Stop()
|
||||
s.clusterDeliverer.Stop()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ServiceController) init() error {
|
||||
if s.federationName == "" {
|
||||
return fmt.Errorf("ServiceController should not be run without federationName.")
|
||||
}
|
||||
if s.zoneName == "" && s.zoneID == "" {
|
||||
return fmt.Errorf("ServiceController must be run with either zoneName or zoneID.")
|
||||
}
|
||||
if s.serviceDnsSuffix == "" {
|
||||
// TODO: Is this the right place to do defaulting?
|
||||
if s.zoneName == "" {
|
||||
return fmt.Errorf("ServiceController must be run with zoneName, if serviceDnsSuffix is not set.")
|
||||
}
|
||||
s.serviceDnsSuffix = s.zoneName
|
||||
}
|
||||
if s.dns == nil {
|
||||
return fmt.Errorf("ServiceController should not be run without a dnsprovider.")
|
||||
}
|
||||
zones, ok := s.dns.Zones()
|
||||
if !ok {
|
||||
return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records")
|
||||
}
|
||||
s.dnsZones = zones
|
||||
matchingZones, err := getDnsZones(s.zoneName, s.zoneID, s.dnsZones)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error querying for DNS zones: %v", err)
|
||||
}
|
||||
if len(matchingZones) == 0 {
|
||||
if s.zoneName == "" {
|
||||
return fmt.Errorf("ServiceController must be run with zoneName to create zone automatically.")
|
||||
}
|
||||
glog.Infof("DNS zone %q not found. Creating DNS zone %q.", s.zoneName, s.zoneName)
|
||||
managedZone, err := s.dnsZones.New(s.zoneName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
zone, err := s.dnsZones.Add(managedZone)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Infof("DNS zone %q successfully created. Note that DNS resolution will not work until you have registered this name with "+
|
||||
"a DNS registrar and they have changed the authoritative name servers for your domain to point to your DNS provider.", zone.Name())
|
||||
}
|
||||
if len(matchingZones) > 1 {
|
||||
return fmt.Errorf("Multiple matching DNS zones found for %q; please specify zoneID", s.zoneName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type reconciliationStatus string
|
||||
@ -355,38 +283,39 @@ const (
|
||||
statusNotSynced = reconciliationStatus("NOSYNC")
|
||||
)
|
||||
|
||||
func (s *ServiceController) workerFunction() bool {
|
||||
key, quit := s.queue.Get()
|
||||
if quit {
|
||||
return true
|
||||
}
|
||||
defer s.queue.Done(key)
|
||||
|
||||
service := key.(string)
|
||||
status := s.reconcileService(service)
|
||||
switch status {
|
||||
case statusAllOk:
|
||||
// do nothing, reconcile is successful.
|
||||
case statusNotSynced:
|
||||
glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
|
||||
s.deliverService(service, s.clusterAvailableDelay, false)
|
||||
case statusRecoverableError:
|
||||
s.deliverService(service, 0, true)
|
||||
case statusNonRecoverableError:
|
||||
// do nothing, error is already logged.
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
func (s *ServiceController) fedServiceWorker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := s.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer s.queue.Done(key)
|
||||
service := key.(string)
|
||||
status := s.reconcileService(service)
|
||||
switch status {
|
||||
case statusAllOk:
|
||||
break
|
||||
case statusNotSynced:
|
||||
glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
|
||||
s.deliverService(service, s.clusterAvailableDelay, false)
|
||||
case statusRecoverableError:
|
||||
s.deliverService(service, 0, true)
|
||||
case statusNonRecoverableError:
|
||||
// error is already logged, do nothing
|
||||
default:
|
||||
// unreachable
|
||||
}
|
||||
}()
|
||||
if quit := s.workerFunction(); quit {
|
||||
glog.Infof("service controller worker queue shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func wantsDNSRecords(service *v1.Service) bool {
|
||||
return service.Spec.Type == v1.ServiceTypeLoadBalancer
|
||||
}
|
||||
|
||||
// delete deletes the given service or returns error if the deletion was not complete.
|
||||
func (s *ServiceController) delete(service *v1.Service) error {
|
||||
glog.V(3).Infof("Handling deletion of service: %v", *service)
|
||||
@ -395,24 +324,6 @@ func (s *ServiceController) delete(service *v1.Service) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure DNS records are removed for service
|
||||
if wantsDNSRecords(service) {
|
||||
key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
serviceIngress, err := ParseFederatedServiceIngress(service)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err))
|
||||
return err
|
||||
}
|
||||
for _, ingress := range serviceIngress.Items {
|
||||
err := s.ensureDnsRecords(ingress.Cluster, service)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, ingress.Cluster, err)
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
|
||||
}
|
||||
}
|
||||
|
||||
err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
|
||||
if err != nil {
|
||||
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
|
||||
@ -789,17 +700,6 @@ func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLB
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure DNS records based on Annotations in federated service for all federated clusters
|
||||
if needUpdate && wantsDNSRecords(fedService) {
|
||||
for _, ingress := range newServiceIngress.Items {
|
||||
err := s.ensureDnsRecords(ingress.Cluster, fedService)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Error ensuring DNS Records for service %s on cluster %q: %v", key, ingress.Cluster, err))
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
|
||||
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
@ -101,8 +100,7 @@ func TestServiceController(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
fakedns, _ := clouddns.NewFakeInterface()
|
||||
sc := New(fedClient, fakedns, "myfederation", "federation.example.com", "example.com", "")
|
||||
sc := New(fedClient)
|
||||
ToFederatedInformerForTestOnly(sc.federatedInformer).SetClientFactory(fedInformerClientFactory)
|
||||
ToFederatedInformerForTestOnly(sc.endpointFederatedInformer).SetClientFactory(fedInformerClientFactory)
|
||||
sc.clusterAvailableDelay = 100 * time.Millisecond
|
||||
|
Loading…
Reference in New Issue
Block a user