Merge pull request #46519 from shashidharatd/fed-dc-ut

Automatic merge from submit-queue (batch tested with PRs 46519, 49794, 49720, 49692, 49821)

Federation: Add delaying deliverer to dns controller

**What this PR does / why we need it**:
- if `ensureDNSRecords` returned an error there was no retry, so now introduced an delaying deliverer which would reattempt to do ensureDNSRecords.
- ~~Revamped unit test cases of DNS controller. Added more test cases and increased the test coverage.
This was a leftover job from earlier refactoring PR's.~~

```release-note
NONE
```

/assign @quinton-hoole 
cc @marun @madhusudancs @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-08-01 03:04:38 -07:00 committed by GitHub
commit 3e53afa6bc
2 changed files with 43 additions and 18 deletions

View File

@ -44,6 +44,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -22,8 +22,6 @@ import (
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
@ -33,12 +31,15 @@ import (
"k8s.io/apimachinery/pkg/watch"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"github.com/golang/glog"
)
const (
@ -68,8 +69,10 @@ type ServiceDNSController struct {
// Informer Store for federated services
serviceStore corelisters.ServiceLister
// Informer controller for federated services
serviceController cache.Controller
workQueue workqueue.Interface
serviceController cache.Controller
workQueue workqueue.Interface
objectDeliverer *util.DelayingDeliverer
flowcontrolBackoff *flowcontrol.Backoff
}
// NewServiceDNSController returns a new service dns controller to manage DNS records for federated services
@ -81,13 +84,15 @@ func NewServiceDNSController(client fedclientset.Interface, dnsProvider, dnsProv
return nil, err
}
d := &ServiceDNSController{
federationClient: client,
dns: dns,
federationName: federationName,
serviceDNSSuffix: serviceDNSSuffix,
zoneName: zoneName,
zoneID: zoneID,
workQueue: workqueue.New(),
federationClient: client,
dns: dns,
federationName: federationName,
serviceDNSSuffix: serviceDNSSuffix,
zoneName: zoneName,
zoneID: zoneID,
workQueue: workqueue.New(),
objectDeliverer: util.NewDelayingDeliverer(),
flowcontrolBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
}
if err := d.validateConfig(); err != nil {
runtime.HandleError(fmt.Errorf("Invalid configuration passed to DNS provider: %v", err))
@ -124,8 +129,13 @@ func (s *ServiceDNSController) DNSControllerRun(workers int, stopCh <-chan struc
defer s.workQueue.ShutDown()
glog.Infof("Starting federation service dns controller")
defer glog.Infof("Stopping federation service dns controller")
s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
s.workQueue.Add(item.Value.(*v1.Service))
})
defer s.objectDeliverer.Stop()
util.StartBackoffGC(s.flowcontrolBackoff, stopCh)
go s.serviceController.Run(stopCh)
for i := 0; i < workers; i++ {
@ -133,6 +143,18 @@ func (s *ServiceDNSController) DNSControllerRun(workers int, stopCh <-chan struc
}
<-stopCh
glog.Infof("Stopping federation service dns controller")
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (s *ServiceDNSController) deliverService(service *v1.Service, delay time.Duration, failed bool) {
if failed {
s.flowcontrolBackoff.Next(service.String(), time.Now())
delay = delay + s.flowcontrolBackoff.Get(service.String())
} else {
s.flowcontrolBackoff.Reset(service.String())
}
s.objectDeliverer.DeliverAfter(service.String(), service, delay)
}
func wantsDNSRecords(service *v1.Service) bool {
@ -158,7 +180,10 @@ func (s *ServiceDNSController) workerFunction() bool {
return false
}
for _, clusterIngress := range ingress.Items {
s.ensureDNSRecords(clusterIngress.Cluster, service)
err = s.ensureDNSRecords(clusterIngress.Cluster, service)
if err != nil {
s.deliverService(service, 0, true)
}
}
return false
}
@ -500,10 +525,6 @@ func (s *ServiceDNSController) ensureDNSRecords(clusterName string, service *v1.
if zoneNames == nil {
return fmt.Errorf("failed to get cluster zone names")
}
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
if err != nil {
return err
}
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
// dnsNames is the path up the DNS search tree, starting at the leaf
dnsNames := []string{
@ -513,8 +534,11 @@ func (s *ServiceDNSController) ensureDNSRecords(clusterName string, service *v1.
"", // nowhere to go up from global level
}
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
if err != nil {
return err
}
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
for i, endpoint := range endpoints {
if err = s.ensureDNSRrsets(s.dnsZone, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
return err