Merge pull request #27252 from mfanjie/fix-ensuredns

Automatic merge from submit-queue

federation: fix dns provider initialization issues

This PR is based on the integration test with Google DNS API. This is the first time of full integration test.
So multiple issues was found and I combined all of them in this single PR

1. add dns provider initialization and add ensureDns call when removing federation service.
2. add new flags federation-name and zone-name to controller manager, both are used as part of the dns record name
3. fix assertion failure at rrsets.go#L61, which will cause panic
4. change getFederationDNSZoneName to get zoneName from config instead of hard code
5. change logic of ensureDnsRrsets, only add new dns record when endpointReachable(set to true when ready address is catched) is true
6. fix bug in processEndpointUpdate, only call ensuredns when ready address is caught
7. change behavior of syncService, there is cases that endpoint is created before ingress IP assignment, so before there is defect for this case, ensureDns was not called when service being updated, so if Ingress IP is assigned after endpoint ready address is caught, the corresponding A records can not be created
8. add a checking before update federation service

@nikhiljindal , can you help to add 1.3 milestone when @quinton-hoole is on leave?
Thanks.

[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()
This commit is contained in:
k8s-merge-robot 2016-06-14 03:20:13 -07:00 committed by GitHub
commit c7e3b2f676
11 changed files with 169 additions and 64 deletions

View File

@ -94,6 +94,7 @@ function create-federation-api-objects {
export FEDERATION_API_NODEPORT=32111
export FEDERATION_NAMESPACE
export FEDERATION_NAME="${FEDERATION_NAME:-federation}"
export DNS_ZONE_NAME="${DNS_ZONE_NAME:-example.com}"
template="go run ${KUBE_ROOT}/federation/cluster/template.go"

View File

@ -121,7 +121,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scClientset, dns)
servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}

View File

@ -35,6 +35,10 @@ type ControllerManagerConfiguration struct {
Port int `json:"port"`
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
Address string `json:"address"`
// federation name.
FederationName string `json:"federationName"`
// zone name, like example.com.
ZoneName string `json:"zoneName"`
// dnsProvider is the provider for dns services.
DnsProvider string `json:"dnsProvider"`
// dnsConfigFile is the path to the dns provider configuration file.
@ -90,6 +94,8 @@ func NewCMServer() *CMServer {
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.")
fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.")
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")

View File

@ -21,6 +21,8 @@ spec:
- --master=https://{{.FEDERATION_APISERVER_DEPLOYMENT_NAME}}:443
- --dns-provider={{.FEDERATION_DNS_PROVIDER}}
- --dns-provider-config={{.FEDERATION_DNS_PROVIDER_CONFIG}}
- --federation-name={{.FEDERATION_NAME}}
- --zone-name={{.DNS_ZONE_NAME}}
ports:
- containerPort: 443
name: https

View File

@ -36,7 +36,7 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
}
list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets()))
for i, rrset := range response.Rrsets() {
list[i] = &ResourceRecordSet{rrset, &rrsets}
list[i] = ResourceRecordSet{rrset, &rrsets}
}
return list, nil
}

View File

@ -85,8 +85,7 @@ func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []str
// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation
func (s *ServiceController) getFederationDNSZoneName() (string, error) {
return "example.com", nil // TODO: quinton: Get this from the federation configuration.
// Note: For unit testing this must match the domain populated in the test/stub dnsprovider.
return s.zoneName, nil
}
// getDnsZone is a hack around the fact that dnsprovider does not yet support a Get() method, only a List() method. TODO: Fix that.
@ -147,7 +146,7 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
*/
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error {
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string, endpointReachable bool) error {
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
if err != nil {
return err
@ -161,30 +160,32 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
return err
}
if rrset == nil {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if endpointReachable {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
} else {
// the rrset already exists, so make it right.
@ -192,6 +193,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
// Need an appropriate CNAME record. Check that we have it.
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
return nil
} else {
@ -200,7 +206,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if uplevelCname != "" {
if uplevelCname != "" && endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
@ -215,6 +221,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
@ -225,8 +236,10 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if _, err = rrsets.Add(newRrset); err != nil {
return err
if endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
}
}
}
@ -254,12 +267,12 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// the state of the service when we last successfully sync'd it's DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records.
//
if s.dns == nil {
return nil
}
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
if s.dns == nil {
return nil
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
@ -280,7 +293,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}
_, endpointReachable := cachedService.endpointMap[clusterName]
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
// dnsNames is the path up the DNS search tree, starting at the leaf
dnsNames := []string{
@ -293,7 +306,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
for i, endpoint := range endpoints {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1], endpointReachable); err != nil {
return err
}
}

View File

@ -93,7 +93,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
}
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
var err error
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
@ -102,10 +102,10 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// need to query dns info from dnsprovider and make sure of if deletion is needed
if ok {
// endpoints lost, clean dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
delete(cachedService.endpointMap, clusterName)
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
@ -120,27 +120,54 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.rwlock.Lock()
var reachable bool
defer cachedService.rwlock.Unlock()
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
cachedService.endpointMap[clusterName] = 1
}
}
_, ok := cachedService.endpointMap[clusterName]
if !ok {
// first time get endpoints, update dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
}
return err
}
}
} else {
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if !reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
}
return err
}
return err
}
}
return nil

View File

@ -29,9 +29,11 @@ var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported
var fakeDnsZones, _ = fakeDns.Zones()
var fakeServiceController = ServiceController{
dns: fakeDns,
dnsZones: fakeDnsZones,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
zoneName: "example.com",
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
clientMap: make(map[string]*clusterCache),
},
@ -52,8 +54,18 @@ func buildEndpoint(subsets [][]string) *v1.Endpoints {
}
func TestProcessEndpointUpdate(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
tests := []struct {
name string
@ -69,7 +81,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
{
@ -81,11 +93,11 @@ func TestProcessEndpointUpdate(t *testing.T) {
},
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
}
fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {

View File

@ -44,7 +44,7 @@ func (sc *ServiceController) clusterServiceWorker() {
if quit {
return
}
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient)
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
if err != nil {
glog.Errorf("Failed to sync service: %+v", err)
}
@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() {
}
// Whenever there is change on service, the federation service should be updated
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error {
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, sc *ServiceController) error {
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
cachedService, ok := serviceCache.get(key)
if !ok {
@ -88,6 +88,15 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
}
if needUpdate {
for i := 0; i < clientRetryCount; i++ {
if err := sc.ensureDnsRecords(clusterName, cachedService); err == nil {
break
}
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err)
time.Sleep(cachedService.nextDNSUpdateDelay())
clusterCache.serviceQueue.Add(key)
// did not retry here as we still want to persist federation apiserver even ensure dns records fails
}
err := cc.persistFedServiceUpdate(cachedService, fedClient)
if err == nil {
cachedService.appliedState = cachedService.lastState
@ -219,7 +228,13 @@ func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedServi
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
var err error
for i := 0; i < clientRetryCount; i++ {
_, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service)
_, err := fedClient.Core().Services(service.Namespace).Get(service.Name)
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
}
_, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service)
if err == nil {
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
return nil

View File

@ -76,7 +76,7 @@ type cachedService struct {
appliedState *v1.Service
// cluster endpoint map hold subset info from kubernetes clusters
// key clusterName
// value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address
// value is a flag that if there is ready address, 1 means there is ready address
endpointMap map[string]int
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
serviceStatusMap map[string]v1.LoadBalancerStatus
@ -101,6 +101,7 @@ type ServiceController struct {
dns dnsprovider.Interface
federationClient federation_release_1_3.Interface
federationName string
zoneName string
// each federation should be configured with a single zone (e.g. "mycompany.com")
dnsZones dnsprovider.Zones
serviceCache *serviceCache
@ -123,7 +124,7 @@ type ServiceController struct {
// New returns a new service controller to keep DNS provider service resources
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
func New(federationClient federation_release_1_3.Interface, dns dnsprovider.Interface) *ServiceController {
func New(federationClient federation_release_1_3.Interface, dns dnsprovider.Interface, federationName, zoneName string) *ServiceController {
broadcaster := record.NewBroadcaster()
// federationClient event is not supported yet
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
@ -132,6 +133,8 @@ func New(federationClient federation_release_1_3.Interface, dns dnsprovider.Inte
s := &ServiceController{
dns: dns,
federationClient: federationClient,
federationName: federationName,
zoneName: zoneName,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
rwlock: sync.Mutex{},
@ -227,6 +230,9 @@ func (s *ServiceController) enqueueService(obj interface{}) {
// It's an error to call Run() more than once for a given ServiceController
// object.
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
if err := s.init(); err != nil {
return err
}
defer runtime.HandleCrash()
go s.serviceController.Run(stopCh)
go s.clusterController.Run(stopCh)
@ -242,6 +248,24 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
return nil
}
func (s *ServiceController) init() error {
if s.federationName == "" {
return fmt.Errorf("ServiceController should not be run without federationName.")
}
if s.zoneName == "" {
return fmt.Errorf("ServiceController should not be run without zoneName.")
}
if s.dns == nil {
return fmt.Errorf("ServiceController should not be run without a dnsprovider.")
}
zones, ok := s.dns.Zones()
if !ok {
return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records.")
}
s.dnsZones = zones
return nil
}
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncService is never invoked concurrently with the same key.
func (s *ServiceController) fedServiceWorker() {
@ -317,6 +341,8 @@ func (s *ServiceController) deleteFederationService(cachedService *cachedService
err := s.deleteClusterService(clusterName, cachedService, cluster.clientset)
if err != nil {
hasErr = true
} else if err := s.ensureDnsRecords(clusterName, cachedService); err != nil {
hasErr = true
}
}
if hasErr {
@ -334,6 +360,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi
err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{})
if err == nil || errors.IsNotFound(err) {
glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
return nil
}
time.Sleep(cachedService.nextRetryDelay())

View File

@ -158,6 +158,7 @@ fake-clientset
federated-api-burst
federated-api-qps
federated-kube-context
federation-name
file-check-frequency
file-suffix
file_content_in_loop
@ -473,3 +474,4 @@ watch-only
whitelist-override-label
windows-line-endings
www-prefix
zone-name