Use federated informer framework and use annotations to store lb ingress

This commit is contained in:
shashidharatd 2017-02-13 19:30:01 +05:30
parent bacd7b7454
commit d00eca48da
7 changed files with 581 additions and 100 deletions

View File

@ -24,8 +24,10 @@ import (
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"k8s.io/kubernetes/pkg/api/v1"
)
const (
@ -34,7 +36,7 @@ const (
)
// getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error)
func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedService *cachedService) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
var (
zoneNames []string
regionName string
@ -42,16 +44,24 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
if zoneNames, regionName, err = s.getClusterZoneNames(clusterName); err != nil {
return nil, nil, nil, err
}
for lbClusterName, lbStatus := range cachedService.serviceStatusMap {
// If federated service is deleted, return empty endpoints, so that DNS records are removed
if service.DeletionTimestamp != nil {
return zoneEndpoints, regionEndpoints, globalEndpoints, nil
}
serviceIngress, err := ParseFederatedServiceIngress(service)
if err != nil {
return nil, nil, nil, err
}
for _, lbClusterIngress := range serviceIngress.Items {
lbClusterName := lbClusterIngress.Cluster
lbZoneNames, lbRegionName, err := s.getClusterZoneNames(lbClusterName)
if err != nil {
return nil, nil, nil, err
}
for _, ingress := range lbStatus.Ingress {
readyEndpoints, ok := cachedService.endpointMap[lbClusterName]
if !ok || readyEndpoints == 0 {
continue
}
for _, ingress := range lbClusterIngress.Items {
var address string
// We should get either an IP address or a hostname - use whichever one we get
if ingress.IP != "" {
@ -61,7 +71,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
}
if len(address) <= 0 {
return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.",
cachedService.lastState.Name, cachedService.lastState.Namespace, clusterName)
service.Name, service.Namespace, clusterName)
}
for _, lbZoneName := range lbZoneNames {
for _, zoneName := range zoneNames {
@ -80,15 +90,12 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
}
// getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS)
func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []string, region string, err error) {
client, ok := s.clusterCache.clientMap[clusterName]
if !ok {
return nil, "", fmt.Errorf("Cluster cache does not contain entry for cluster %s", clusterName)
func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) {
cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{})
if err != nil {
return nil, "", err
}
if client.cluster == nil {
return nil, "", fmt.Errorf("Cluster cache entry for cluster %s is nil", clusterName)
}
return client.cluster.Status.Zones, client.cluster.Status.Region, nil
return cluster.Status.Zones, cluster.Status.Region, nil
}
// getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records
@ -284,7 +291,7 @@ given the current state of that service in that cluster. This should be called
(or vice versa). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint
are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist
in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */
func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *cachedService) error {
func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error {
// Quinton: Pseudocode....
// See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648
// For each service we need the following DNS names:
@ -298,21 +305,21 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// - a set of A records to IP addresses of all healthy shards in all regions, if one or more of these exist.
// - no record (NXRECORD response) if no healthy shards exist in any regions
//
// For each cached service, cachedService.lastState tracks the current known state of the service, while cachedService.appliedState contains
// the state of the service when we last successfully synced its DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records).
// Each service has the current known state of loadbalancer ingress for the federated cluster stored in annotations.
// So generate the DNS records based on the current state and ensure those desired DNS records match the
// actual DNS records (add new records, remove deleted records, and update changed records).
//
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
}
if s.dns == nil {
return nil
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
if service == nil {
return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
}
serviceName := cachedService.lastState.Name
namespaceName := cachedService.lastState.Namespace
serviceName := service.Name
namespaceName := service.Namespace
zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
if err != nil {
return err
@ -324,7 +331,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, cachedService)
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
if err != nil {
return err
}

View File

@ -27,11 +27,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/v1"
)
func TestServiceController_ensureDnsRecords(t *testing.T) {
clusterName := "testcluster"
tests := []struct {
name string
service v1.Service
@ -44,6 +48,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
Namespace: "servicenamespace",
Annotations: map[string]string{
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
AddEndpoints(clusterName, []string{"198.51.100.1"}).
String()},
},
},
serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}),
@ -92,7 +100,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
if !ok {
t.Error("Unable to fetch zones")
}
fakeClient := &fakefedclientset.Clientset{}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
serviceController := ServiceController{
federationClient: fakeClient,
dns: fakedns,
dnsZones: fakednsZones,
serviceDnsSuffix: "federation.example.com",
@ -106,8 +117,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
knownClusterSet: make(sets.String),
}
clusterName := "testcluster"
serviceController.clusterCache.clientMap[clusterName] = &clusterCache{
cluster: &v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
@ -127,7 +136,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
cachedService.serviceStatusMap[clusterName] = test.serviceStatus
}
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, &test.service)
if err != nil {
t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
}

View File

@ -122,7 +122,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
@ -154,7 +154,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
@ -175,7 +175,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}

View File

@ -21,14 +21,18 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
v1 "k8s.io/kubernetes/pkg/api/v1"
)
var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
var fakeDnsZones, _ = fakeDns.Zones()
var fakeClient = &fakefedclientset.Clientset{}
var fakeServiceController = ServiceController{
federationClient: fakeClient,
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
@ -68,6 +72,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
},
},
}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
tests := []struct {
name string
cachedService *cachedService
@ -121,6 +126,7 @@ func TestProcessEndpointDeletion(t *testing.T) {
},
},
}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
tests := []struct {
name string
cachedService *cachedService

View File

@ -108,7 +108,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
if needUpdate {
for i := 0; i < clientRetryCount; i++ {
err := sc.ensureDnsRecords(clusterName, cachedService)
err := sc.ensureDnsRecords(clusterName, service)
if err == nil {
break
}

View File

@ -18,6 +18,8 @@ package service
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@ -27,8 +29,10 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -36,7 +40,9 @@ import (
clientv1 "k8s.io/client-go/pkg/api/v1"
cache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
fedapi "k8s.io/kubernetes/federation/apis/federation"
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationcache "k8s.io/kubernetes/federation/client/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
@ -76,6 +82,7 @@ const (
maxNoOfClusters = 100
reviewDelay = 10 * time.Second
updateTimeout = 30 * time.Second
allClustersKey = "ALL_CLUSTERS"
clusterAvailableDelay = time.Second * 20
@ -156,6 +163,15 @@ type ServiceController struct {
clusterDeliverer *util.DelayingDeliverer
deletionHelper *deletionhelper.DeletionHelper
reviewDelay time.Duration
clusterAvailableDelay time.Duration
updateTimeout time.Duration
endpointFederatedInformer fedutil.FederatedInformer
federatedUpdater fedutil.FederatedUpdater
objectDeliverer *util.DelayingDeliverer
flowcontrolBackoff *flowcontrol.Backoff
}
// New returns a new service controller to keep DNS provider service resources
@ -180,11 +196,16 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
rwlock: sync.Mutex{},
clientMap: make(map[string]*clusterCache),
},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
queue: workqueue.New(),
knownClusterSet: make(sets.String),
eventBroadcaster: broadcaster,
eventRecorder: recorder,
queue: workqueue.New(),
knownClusterSet: make(sets.String),
reviewDelay: reviewDelay,
clusterAvailableDelay: clusterAvailableDelay,
updateTimeout: updateTimeout,
flowcontrolBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
}
s.objectDeliverer = util.NewDelayingDeliverer()
s.clusterDeliverer = util.NewDelayingDeliverer()
var serviceIndexer cache.Indexer
serviceIndexer, s.serviceController = cache.NewIndexerInformer(
@ -198,57 +219,13 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
},
&v1.Service{},
serviceSyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
// there is case that old and new are equals but we still catch the event now.
if !reflect.DeepEqual(old, cur) {
s.enqueueService(cur)
}
},
DeleteFunc: s.enqueueService,
},
util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) {
glog.V(5).Infof("Delivering notification from federation: %v", obj)
s.deliverObject(obj, 0, false)
}),
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
s.serviceStore = corelisters.NewServiceLister(serviceIndexer)
s.clusterStore.Store, s.clusterController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return s.federationClient.Federation().Clusters().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return s.federationClient.Federation().Clusters().Watch(options)
},
},
&v1beta1.Cluster{},
clusterSyncPeriod,
cache.ResourceEventHandlerFuncs{
DeleteFunc: s.clusterCache.delFromClusterSet,
AddFunc: s.clusterCache.addToClientMap,
UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*v1beta1.Cluster)
if !ok {
return
}
curCluster, ok := cur.(*v1beta1.Cluster)
if !ok {
return
}
if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
// update when spec is changed
s.clusterCache.addToClientMap(cur)
}
pred := getClusterConditionPredicate()
// only update when condition changed to ready from not-ready
if !pred(*oldCluster) && pred(*curCluster) {
s.clusterCache.addToClientMap(cur)
}
// did not handle ready -> not-ready
// how could we stop a controller?
},
},
)
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *v1beta1.Cluster) {
@ -271,14 +248,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
// would be just confirmation that some service operation succeeded.
util.NewTriggerOnAllChanges(
func(obj pkgruntime.Object) {
// TODO: Use this to enque services.
glog.V(5).Infof("Delivering service notification from federated cluster %s: %v", cluster.Name, obj)
s.deliverObject(obj, s.reviewDelay, false)
},
))
}
s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer,
s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service)
_, err := client.Core().Services(svc.Namespace).Create(svc)
@ -293,9 +271,41 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
svc := obj.(*v1.Service)
orphanDependents := false
err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
// IsNotFound error is fine since that means the object is deleted already.
if errors.IsNotFound(err) {
return nil
}
return err
})
// Federated informers on endpoints in federated clusters.
// This will enable to check if service ingress endpoints in federated clusters are reachable
s.endpointFederatedInformer = fedutil.NewFederatedInformer(
federationClient,
func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (
cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return targetClient.Core().Endpoints(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return targetClient.Core().Endpoints(metav1.NamespaceAll).Watch(options)
},
},
&v1.Endpoints{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndFieldChanges(
"Subsets",
func(obj pkgruntime.Object) {
glog.V(5).Infof("Delivering endpoint notification from federated cluster %s :%v", cluster.Name, obj)
s.deliverObject(obj, s.reviewDelay, false)
},
))
},
&fedutil.ClusterLifecycleHandlerFuncs{},
)
s.deletionHelper = deletionhelper.NewDeletionHelper(
s.hasFinalizerFunc,
s.removeFinalizerFunc,
@ -308,7 +318,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
updateTimeout,
s.eventRecorder,
s.federatedInformer,
federatedUpdater,
s.federatedUpdater,
)
s.endpointWorkerMap = make(map[string]bool)
@ -393,22 +403,27 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
}
defer runtime.HandleCrash()
s.federatedInformer.Start()
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
// TODO: Use this new clusterDeliverer to reconcile services in new clusters.
s.endpointFederatedInformer.Start()
s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
s.queue.Add(item.Value.(string))
})
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
s.deliverServicesOnClusterChange()
})
fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh)
go s.serviceController.Run(stopCh)
go s.clusterController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(s.fedServiceWorker, time.Second, stopCh)
}
go wait.Until(s.clusterEndpointWorker, time.Second, stopCh)
go wait.Until(s.clusterServiceWorker, time.Second, stopCh)
go wait.Until(s.clusterSyncLoop, time.Second, stopCh)
go func() {
<-stopCh
glog.Infof("Shutting down Federation Service Controller")
s.queue.ShutDown()
s.federatedInformer.Stop()
s.endpointFederatedInformer.Stop()
s.objectDeliverer.Stop()
s.clusterDeliverer.Stop()
}()
return nil
}
@ -461,8 +476,15 @@ func (s *ServiceController) init() error {
return nil
}
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncService is never invoked concurrently with the same key.
func (s *ServiceController) fedServiceWorker() {
for {
func() {
@ -470,11 +492,21 @@ func (s *ServiceController) fedServiceWorker() {
if quit {
return
}
defer s.queue.Done(key)
err := s.syncService(key.(string))
if err != nil {
glog.Errorf("Error syncing service: %v", err)
service := key.(string)
status, err := s.reconcileService(service)
switch status {
case statusAllOk:
break
case statusError:
runtime.HandleError(fmt.Errorf("Error reconciling service %q: %v, delivering again", service, err))
s.deliverService(service, 0, true)
case statusNotSynced:
glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
s.deliverService(service, s.clusterAvailableDelay, false)
default:
runtime.HandleError(fmt.Errorf("Unhandled reconciliation status for %q: %s, delivering again", service, status))
s.deliverService(service, s.reviewDelay, false)
}
}()
}
@ -871,7 +903,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust
for key := range s.clusterCache.clientMap {
for _, clusterName := range clusterNames {
if key == clusterName {
err := s.ensureDnsRecords(clusterName, service)
err := s.ensureDnsRecords(clusterName, service.lastState)
if err != nil {
unensuredCount += 1
glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err)
@ -1065,6 +1097,24 @@ func (s *ServiceController) delete(service *v1.Service) error {
return err
}
// Ensure DNS records are removed for service
if wantsDNSRecords(service) {
key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
serviceIngress, err := ParseFederatedServiceIngress(service)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err))
return err
}
for _, ingress := range serviceIngress.Items {
err := s.ensureDnsRecords(ingress.Cluster, service)
if err != nil {
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, ingress.Cluster, err)
return err
}
glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
}
}
err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
@ -1085,3 +1135,410 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
s.serviceCache.delete(key)
return nil, doNotRetry
}
func (s *ServiceController) deliverServicesOnClusterChange() {
if !s.isSynced() {
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay)
}
glog.V(5).Infof("Delivering all service as cluster status changed")
serviceList, err := s.serviceStore.List(labels.Everything())
if err != nil {
runtime.HandleError(fmt.Errorf("error listing federated services: %v", err))
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, 0)
}
for _, service := range serviceList {
s.deliverObject(service, 0, false)
}
}
func (s *ServiceController) deliverObject(object interface{}, delay time.Duration, failed bool) {
switch value := object.(type) {
case *v1.Service:
s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed)
case *v1.Endpoints:
s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed)
default:
glog.Warningf("Unknown object received: %v", object)
}
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (s *ServiceController) deliverService(key string, delay time.Duration, failed bool) {
if failed {
s.flowcontrolBackoff.Next(key, time.Now())
delay = delay + s.flowcontrolBackoff.Get(key)
} else {
s.flowcontrolBackoff.Reset(key)
}
s.objectDeliverer.DeliverAfter(key, key, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet synced with
// the corresponding api server.
func (s *ServiceController) isSynced() bool {
if !s.federatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
serviceClusters, err := s.federatedInformer.GetReadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err))
return false
}
if !s.federatedInformer.GetTargetStore().ClustersSynced(serviceClusters) {
return false
}
if !s.endpointFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
endpointClusters, err := s.endpointFederatedInformer.GetReadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err))
return false
}
if !s.endpointFederatedInformer.GetTargetStore().ClustersSynced(endpointClusters) {
return false
}
return true
}
// reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters.
// This function is called on service Addition/Deletion/Updation either in federated cluster or in federation.
func (s *ServiceController) reconcileService(key string) (reconciliationStatus, error) {
if !s.isSynced() {
glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key)
return statusNotSynced, nil
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err))
return statusError, err
}
service, err := s.serviceStore.Services(namespace).Get(name)
if errors.IsNotFound(err) {
// Not a federated service, ignoring.
return statusAllOk, nil
} else if err != nil {
return statusError, err
}
glog.V(3).Infof("Reconciling federated service: %s", key)
// Create a copy before modifying the service to prevent race condition with other readers of service from store
fedServiceObj, err := api.Scheme.DeepCopy(service)
fedService, ok := fedServiceObj.(*v1.Service)
if err != nil || !ok {
runtime.HandleError(fmt.Errorf("Error in retrieving obj from store: %s, %v", key, err))
return statusError, err
}
// Handle deletion of federated service
if fedService.DeletionTimestamp != nil {
if err := s.delete(fedService); err != nil {
runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err))
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err)
return statusError, err
}
glog.V(3).Infof("Deleting federated service succeeded: %s", key)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded")
return statusAllOk, nil
}
// Add the required finalizers before creating a service in underlying clusters. This ensures that the
// dependent services in underlying clusters are deleted when the federated service is deleted.
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService)
if err != nil {
glog.Warningf("Failed to ensure setting finalizer for service %s: %v", key, err)
return statusError, err
}
fedService = updatedServiceObj.(*v1.Service)
// Synchronize the federated service in all underlying ready clusters.
clusters, err := s.federatedInformer.GetReadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err))
return statusError, err
}
newLBStatus := newLoadbalancerStatus()
newServiceIngress := NewFederatedServiceIngress()
operations := make([]fedutil.FederatedOperation, 0)
for _, cluster := range clusters {
// Aggregate all operations to perform on all federated clusters
operation, err := s.getOperationsToPerformOnCluster(cluster, fedService)
if err != nil {
return statusError, err
}
if operation != nil {
operations = append(operations, *operation)
}
// Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service
lbStatus, err := s.getServiceStatusInCluster(cluster, key)
if err != nil {
return statusError, err
}
if len(lbStatus.Ingress) > 0 {
newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...)
// Add/Update federated service ingress only if there are reachable endpoints backing the lb service
endpoints, err := s.getServiceEndpointsInCluster(cluster, key)
if err != nil {
return statusError, err
}
// if there are no endpoints created for the service then the loadbalancer ingress
// is not reachable, so do not consider such loadbalancer ingresses for federated
// service ingresses
if len(endpoints) > 0 {
clusterIngress := fedapi.ClusterServiceIngress{
Cluster: cluster.Name,
Items: lbStatus.Ingress,
}
newServiceIngress.Items = append(newServiceIngress.Items, clusterIngress)
}
}
}
if len(operations) != 0 {
err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout,
func(op fedutil.FederatedOperation, operror error) {
runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror))
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror)
})
if err != nil {
if !errors.IsAlreadyExists(err) {
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))
return statusError, err
}
}
}
// Update the federated service if there are any updates in clustered service (status/endpoints)
err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress)
if err != nil {
return statusError, err
}
glog.V(5).Infof("Everything is in order in federated clusters for service %s", key)
return statusAllOk, nil
}
// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service
func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Cluster, fedService *v1.Service) (*fedutil.FederatedOperation, error) {
var operation *fedutil.FederatedOperation
key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String()
clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err))
return nil, err
}
if !serviceFound {
desiredService := &v1.Service{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta),
Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
}
desiredService.ResourceVersion = ""
glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name)
operation = &fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: desiredService,
ClusterName: cluster.Name,
}
} else {
clusterService, ok := clusterServiceObj.(*v1.Service)
if !ok {
runtime.HandleError(fmt.Errorf("Unexpected error for %q: %v", key, err))
return nil, err
}
desiredService := &v1.Service{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(clusterService.ObjectMeta),
Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
}
// ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating
desiredService.Spec.ClusterIP = clusterService.Spec.ClusterIP
for _, cPort := range clusterService.Spec.Ports {
for i, fPort := range clusterService.Spec.Ports {
if fPort.Name == cPort.Name && fPort.Protocol == cPort.Protocol && fPort.Port == cPort.Port {
desiredService.Spec.Ports[i].NodePort = cPort.NodePort
}
}
}
// Update existing service, if needed.
if !Equivalent(desiredService, clusterService) {
glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService)
// ResourceVersion of cluster service can be different from federated service,
// so do not update ResourceVersion while updating cluster service
desiredService.ResourceVersion = clusterService.ResourceVersion
operation = &fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: desiredService,
ClusterName: cluster.Name,
}
} else {
glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService)
}
}
return operation, nil
}
// getServiceStatusInCluster returns service status in federated cluster
func (s *ServiceController) getServiceStatusInCluster(cluster *v1beta1.Cluster, key string) (*v1.LoadBalancerStatus, error) {
lbStatus := &v1.LoadBalancerStatus{}
clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err))
return lbStatus, err
}
if serviceFound {
clusterService, ok := clusterServiceObj.(*v1.Service)
if !ok {
err = fmt.Errorf("Unknown object received: %v", clusterServiceObj)
runtime.HandleError(err)
return lbStatus, err
}
lbStatus = &clusterService.Status.LoadBalancer
newLbStatus := &loadbalancerStatus{*lbStatus}
sort.Sort(newLbStatus)
}
return lbStatus, nil
}
// getServiceEndpointsInCluster returns ready endpoints corresonding to service in federated cluster
func (s *ServiceController) getServiceEndpointsInCluster(cluster *v1beta1.Cluster, key string) ([]v1.EndpointAddress, error) {
addresses := []v1.EndpointAddress{}
clusterEndpointsObj, endpointsFound, err := s.endpointFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get %s endpoint from %s: %v", key, cluster.Name, err))
return addresses, err
}
if endpointsFound {
clusterEndpoints, ok := clusterEndpointsObj.(*v1.Endpoints)
if !ok {
glog.Warningf("Unknown object received: %v", clusterEndpointsObj)
return addresses, fmt.Errorf("Unknown object received: %v", clusterEndpointsObj)
}
for _, subset := range clusterEndpoints.Subsets {
if len(subset.Addresses) > 0 {
addresses = append(addresses, subset.Addresses...)
}
}
}
return addresses, nil
}
// updateFederatedService updates the federated service with aggregated lbStatus and serviceIngresses
// and also updates the dns records as needed
func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLBStatus *loadbalancerStatus, newServiceIngress *FederatedServiceIngress) error {
key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String()
needUpdate := false
// Sort the endpoints so that we can compare
sort.Sort(newLBStatus)
if !reflect.DeepEqual(fedService.Status.LoadBalancer.Ingress, newLBStatus.Ingress) {
fedService.Status.LoadBalancer.Ingress = newLBStatus.Ingress
glog.V(3).Infof("Federated service loadbalancer status updated for %s: %v", key, newLBStatus.Ingress)
needUpdate = true
}
existingServiceIngress, err := ParseFederatedServiceIngress(fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err))
return err
}
// TODO: We should have a reliable cluster health check(should consider quorum) to detect cluster is not
// reachable and remove dns records for them. Until a reliable cluster health check is available, below code is
// a workaround to not remove the existing dns records which were created before the cluster went offline.
unreadyClusters, err := s.federatedInformer.GetUnreadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get unready cluster list: %v", err))
return err
}
for _, cluster := range unreadyClusters {
lbIngress := existingServiceIngress.GetClusterLoadBalancerIngresses(cluster.Name)
newServiceIngress.AddClusterLoadBalancerIngresses(cluster.Name, lbIngress)
glog.V(5).Infof("Cluster %s is Offline, Preserving previously available status for Service %s", cluster.Name, key)
}
// Update federated service status and/or ingress annotations if changed
sort.Sort(newServiceIngress)
if !reflect.DeepEqual(existingServiceIngress.Items, newServiceIngress.Items) {
fedService = UpdateIngressAnnotation(fedService, newServiceIngress)
glog.V(3).Infof("Federated service loadbalancer ingress updated for %s: existing: %#v, desired: %#v", key, existingServiceIngress, newServiceIngress)
needUpdate = true
}
if needUpdate {
var err error
fedService, err = s.federationClient.Core().Services(fedService.Namespace).UpdateStatus(fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Error updating the federation service object %s: %v", key, err))
return err
}
}
// Ensure DNS records based on Annotations in federated service for all federated clusters
if needUpdate && wantsDNSRecords(fedService) {
for _, ingress := range newServiceIngress.Items {
err := s.ensureDnsRecords(ingress.Cluster, fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Error ensuring DNS Records for service %s on cluster %q: %v", key, ingress.Cluster, err))
return err
}
glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
}
}
return nil
}
// Equivalent Checks if cluster-independent, user provided data in two given services are equal. If in the future the
// services structure is expanded then any field that is not populated by the api server should be included here.
func Equivalent(s1, s2 *v1.Service) bool {
// TODO: should also check for all annotations except FederationServiceIngressAnnotation
return s1.Name == s2.Name && s1.Namespace == s2.Namespace &&
(reflect.DeepEqual(s1.Labels, s2.Labels) || (len(s1.Labels) == 0 && len(s2.Labels) == 0)) &&
reflect.DeepEqual(s1.Spec, s2.Spec)
}
type loadbalancerStatus struct {
v1.LoadBalancerStatus
}
func newLoadbalancerStatus() *loadbalancerStatus {
return &loadbalancerStatus{}
}
func (lbs loadbalancerStatus) Len() int {
return len(lbs.Ingress)
}
func (lbs loadbalancerStatus) Less(i, j int) bool {
ipComparison := strings.Compare(lbs.Ingress[i].IP, lbs.Ingress[j].IP)
hostnameComparison := strings.Compare(lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname)
if ipComparison < 0 || (ipComparison == 0 && hostnameComparison < 0) {
return true
}
return false
}
func (lbs loadbalancerStatus) Swap(i, j int) {
lbs.Ingress[i].IP, lbs.Ingress[j].IP = lbs.Ingress[j].IP, lbs.Ingress[i].IP
lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname = lbs.Ingress[j].Hostname, lbs.Ingress[i].Hostname
}

View File

@ -367,6 +367,8 @@ func NewCluster(name string, readyStatus apiv1.ConditionStatus) *federationapi.C
Conditions: []federationapi.ClusterCondition{
{Type: federationapi.ClusterReady, Status: readyStatus},
},
Zones: []string{"foozone"},
Region: "fooregion",
},
}
}