mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Remove unused code in federation service controller
This commit is contained in:
parent
2d79d53fb2
commit
d2462c79bd
@ -1,212 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
restclient "k8s.io/client-go/rest"
|
|
||||||
cache "k8s.io/client-go/tools/cache"
|
|
||||||
"k8s.io/client-go/util/workqueue"
|
|
||||||
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
|
||||||
v1 "k8s.io/kubernetes/pkg/api/v1"
|
|
||||||
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
|
||||||
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
|
||||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type clusterCache struct {
|
|
||||||
clientset *kubeclientset.Clientset
|
|
||||||
cluster *v1beta1.Cluster
|
|
||||||
// A store of services, populated by the serviceController
|
|
||||||
serviceStore corelisters.ServiceLister
|
|
||||||
// Watches changes to all services
|
|
||||||
serviceController cache.Controller
|
|
||||||
// A store of endpoint, populated by the serviceController
|
|
||||||
endpointStore corelisters.EndpointsLister
|
|
||||||
// Watches changes to all endpoints
|
|
||||||
endpointController cache.Controller
|
|
||||||
// services that need to be synced
|
|
||||||
serviceQueue *workqueue.Type
|
|
||||||
// endpoints that need to be synced
|
|
||||||
endpointQueue *workqueue.Type
|
|
||||||
}
|
|
||||||
|
|
||||||
type clusterClientCache struct {
|
|
||||||
rwlock sync.Mutex // protects serviceMap
|
|
||||||
clientMap map[string]*clusterCache
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterName string) {
|
|
||||||
cachedClusterClient, ok := cc.clientMap[clusterName]
|
|
||||||
// only create when no existing cachedClusterClient
|
|
||||||
if ok {
|
|
||||||
if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) {
|
|
||||||
//rebuild clientset when cluster spec is changed
|
|
||||||
clientset, err := newClusterClientset(cluster)
|
|
||||||
if err != nil || clientset == nil {
|
|
||||||
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName)
|
|
||||||
cachedClusterClient.clientset = clientset
|
|
||||||
go cachedClusterClient.serviceController.Run(wait.NeverStop)
|
|
||||||
go cachedClusterClient.endpointController.Run(wait.NeverStop)
|
|
||||||
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
|
|
||||||
} else {
|
|
||||||
// do nothing when there is no spec change
|
|
||||||
glog.V(4).Infof("Keep clientset for cluster %s", clusterName)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("No client cache for cluster %s, building new", clusterName)
|
|
||||||
clientset, err := newClusterClientset(cluster)
|
|
||||||
if err != nil || clientset == nil {
|
|
||||||
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
|
|
||||||
}
|
|
||||||
cachedClusterClient = &clusterCache{
|
|
||||||
cluster: cluster,
|
|
||||||
clientset: clientset,
|
|
||||||
serviceQueue: workqueue.New(),
|
|
||||||
endpointQueue: workqueue.New(),
|
|
||||||
}
|
|
||||||
var endpointIndexer cache.Indexer
|
|
||||||
endpointIndexer, cachedClusterClient.endpointController = cache.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
|
|
||||||
return clientset.Core().Endpoints(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return clientset.Core().Endpoints(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Endpoints{},
|
|
||||||
serviceSyncPeriod,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: func(obj interface{}) {
|
|
||||||
cc.enqueueEndpoint(obj, clusterName)
|
|
||||||
},
|
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
|
||||||
cc.enqueueEndpoint(cur, clusterName)
|
|
||||||
},
|
|
||||||
DeleteFunc: func(obj interface{}) {
|
|
||||||
cc.enqueueEndpoint(obj, clusterName)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
|
||||||
cachedClusterClient.endpointStore = corelisters.NewEndpointsLister(endpointIndexer)
|
|
||||||
|
|
||||||
var serviceIndexer cache.Indexer
|
|
||||||
serviceIndexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
|
|
||||||
return clientset.Core().Services(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return clientset.Core().Services(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{},
|
|
||||||
serviceSyncPeriod,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: func(obj interface{}) {
|
|
||||||
cc.enqueueService(obj, clusterName)
|
|
||||||
},
|
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
|
||||||
oldService, ok := old.(*v1.Service)
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
curService, ok := cur.(*v1.Service)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) {
|
|
||||||
cc.enqueueService(cur, clusterName)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
DeleteFunc: func(obj interface{}) {
|
|
||||||
service, _ := obj.(*v1.Service)
|
|
||||||
cc.enqueueService(obj, clusterName)
|
|
||||||
glog.V(2).Infof("Service %s/%s deletion found and enqueue to service store %s", service.Namespace, service.Name, clusterName)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
|
||||||
cachedClusterClient.serviceStore = corelisters.NewServiceLister(serviceIndexer)
|
|
||||||
cc.clientMap[clusterName] = cachedClusterClient
|
|
||||||
go cachedClusterClient.serviceController.Run(wait.NeverStop)
|
|
||||||
go cachedClusterClient.endpointController.Run(wait.NeverStop)
|
|
||||||
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO: copied from cluster controller, to make this as common function in pass 2
|
|
||||||
// delFromClusterSet delete a cluster from clusterSet and
|
|
||||||
// delete the corresponding restclient from the map clusterKubeClientMap
|
|
||||||
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
|
|
||||||
cluster, ok := obj.(*v1beta1.Cluster)
|
|
||||||
cc.rwlock.Lock()
|
|
||||||
defer cc.rwlock.Unlock()
|
|
||||||
if ok {
|
|
||||||
delete(cc.clientMap, cluster.Name)
|
|
||||||
} else {
|
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
||||||
if !ok {
|
|
||||||
glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
glog.Infof("Found tombstone for %v", obj)
|
|
||||||
delete(cc.clientMap, tombstone.Key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
|
|
||||||
// restclient to map clusterKubeClientMap
|
|
||||||
func (cc *clusterClientCache) addToClientMap(obj interface{}) {
|
|
||||||
cc.rwlock.Lock()
|
|
||||||
defer cc.rwlock.Unlock()
|
|
||||||
cluster, ok := obj.(*v1beta1.Cluster)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pred := getClusterConditionPredicate()
|
|
||||||
// check status
|
|
||||||
// skip if not ready
|
|
||||||
if pred(*cluster) {
|
|
||||||
cc.startClusterLW(cluster, cluster.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newClusterClientset(c *v1beta1.Cluster) (*kubeclientset.Clientset, error) {
|
|
||||||
clusterConfig, err := util.BuildClusterConfig(c)
|
|
||||||
if clusterConfig != nil {
|
|
||||||
clientset := kubeclientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
|
|
||||||
return clientset, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
@ -20,12 +20,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
|
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
|
||||||
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
|
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
|
||||||
@ -45,10 +43,9 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
|||||||
barZoneDNSName := "servicename.servicenamespace.myfederation.svc.barzone.barregion.federation.example.com"
|
barZoneDNSName := "servicename.servicenamespace.myfederation.svc.barzone.barregion.federation.example.com"
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
service v1.Service
|
service v1.Service
|
||||||
expected []string
|
expected []string
|
||||||
serviceStatus v1.LoadBalancerStatus
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "ServiceWithSingleLBIngress",
|
name: "ServiceWithSingleLBIngress",
|
||||||
@ -62,7 +59,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
|||||||
String()},
|
String()},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}),
|
|
||||||
expected: []string{
|
expected: []string{
|
||||||
"example.com:" + globalDNSName + ":A:180:[198.51.100.1]",
|
"example.com:" + globalDNSName + ":A:180:[198.51.100.1]",
|
||||||
"example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]",
|
"example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]",
|
||||||
@ -82,7 +78,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
|||||||
Namespace: "servicenamespace",
|
Namespace: "servicenamespace",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
serviceStatus: buildServiceStatus([][]string{{"", "randomstring.amazonelb.example.com"}}),
|
|
||||||
expected: []string{
|
expected: []string{
|
||||||
"example.com:"+globalDNSName+":A:180:[198.51.100.1]",
|
"example.com:"+globalDNSName+":A:180:[198.51.100.1]",
|
||||||
"example.com:"+fooRegionDNSName+":A:180:[198.51.100.1]",
|
"example.com:"+fooRegionDNSName+":A:180:[198.51.100.1]",
|
||||||
@ -207,31 +202,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
|
|||||||
serviceDnsSuffix: "federation.example.com",
|
serviceDnsSuffix: "federation.example.com",
|
||||||
zoneName: "example.com",
|
zoneName: "example.com",
|
||||||
federationName: "myfederation",
|
federationName: "myfederation",
|
||||||
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
|
||||||
clusterCache: &clusterClientCache{
|
|
||||||
rwlock: sync.Mutex{},
|
|
||||||
clientMap: make(map[string]*clusterCache),
|
|
||||||
},
|
|
||||||
knownClusterSet: make(sets.String),
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceController.clusterCache.clientMap[cluster1Name] = &clusterCache{
|
|
||||||
cluster: &v1beta1.Cluster{
|
|
||||||
Status: v1beta1.ClusterStatus{
|
|
||||||
Zones: []string{"foozone"},
|
|
||||||
Region: "fooregion",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
cachedService := &cachedService{
|
|
||||||
lastState: &test.service,
|
|
||||||
endpointMap: make(map[string]int),
|
|
||||||
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
|
|
||||||
}
|
|
||||||
cachedService.endpointMap[cluster1Name] = 1
|
|
||||||
if !reflect.DeepEqual(&test.serviceStatus, &v1.LoadBalancerStatus{}) {
|
|
||||||
cachedService.serviceStatusMap[cluster1Name] = test.serviceStatus
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := serviceController.ensureDnsRecords(cluster1Name, &test.service)
|
err := serviceController.ensureDnsRecords(cluster1Name, &test.service)
|
||||||
|
@ -1,19 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Package service contains code for syncing Kubernetes services,
|
|
||||||
// and cloud DNS servers with the federated service registry.
|
|
||||||
package service // import "k8s.io/kubernetes/federation/pkg/federation-controller/service"
|
|
@ -1,202 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
cache "k8s.io/client-go/tools/cache"
|
|
||||||
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
|
||||||
v1 "k8s.io/kubernetes/pkg/api/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
|
||||||
func (sc *ServiceController) clusterEndpointWorker() {
|
|
||||||
// process all pending events in endpointWorkerDoneChan
|
|
||||||
ForLoop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case clusterName := <-sc.endpointWorkerDoneChan:
|
|
||||||
sc.endpointWorkerMap[clusterName] = false
|
|
||||||
default:
|
|
||||||
// non-blocking, comes here if all existing events are processed
|
|
||||||
break ForLoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for clusterName, cache := range sc.clusterCache.clientMap {
|
|
||||||
workerExist, found := sc.endpointWorkerMap[clusterName]
|
|
||||||
if found && workerExist {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a worker only if the previous worker has finished and gone out of scope
|
|
||||||
go func(cache *clusterCache, clusterName string) {
|
|
||||||
fedClient := sc.federationClient
|
|
||||||
for {
|
|
||||||
func() {
|
|
||||||
key, quit := cache.endpointQueue.Get()
|
|
||||||
// update endpoint cache
|
|
||||||
if quit {
|
|
||||||
// send signal that current worker has finished tasks and is going out of scope
|
|
||||||
sc.endpointWorkerDoneChan <- clusterName
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer cache.endpointQueue.Done(key)
|
|
||||||
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(2).Infof("Failed to sync endpoint: %+v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}(cache, clusterName)
|
|
||||||
sc.endpointWorkerMap[clusterName] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Whenever there is change on endpoint, the federation service should be updated
|
|
||||||
// key is the namespaced name of endpoint
|
|
||||||
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient fedclientset.Interface, serviceController *ServiceController) error {
|
|
||||||
cachedService, ok := serviceCache.get(key)
|
|
||||||
if !ok {
|
|
||||||
// here we filtered all non-federation services
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
|
|
||||||
clusterCache.endpointQueue.Add(key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
endpoint, err := clusterCache.endpointStore.Endpoints(namespace).Get(name)
|
|
||||||
switch {
|
|
||||||
case errors.IsNotFound(err):
|
|
||||||
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
|
||||||
glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
|
|
||||||
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
|
|
||||||
case err != nil:
|
|
||||||
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
|
|
||||||
clusterCache.endpointQueue.Add(key)
|
|
||||||
return err
|
|
||||||
default:
|
|
||||||
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
|
|
||||||
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
|
|
||||||
clusterCache.endpointQueue.Add(key)
|
|
||||||
}
|
|
||||||
cachedService.resetDNSUpdateDelay()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
|
|
||||||
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
|
||||||
var err error
|
|
||||||
cachedService.rwlock.Lock()
|
|
||||||
defer cachedService.rwlock.Unlock()
|
|
||||||
_, ok := cachedService.endpointMap[clusterName]
|
|
||||||
// TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist
|
|
||||||
// need to query dns info from dnsprovider and make sure of if deletion is needed
|
|
||||||
if ok {
|
|
||||||
// endpoints lost, clean dns record
|
|
||||||
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
|
||||||
delete(cachedService.endpointMap, clusterName)
|
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
|
||||||
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
|
|
||||||
time.Sleep(cachedService.nextDNSUpdateDelay())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update dns info when endpoint update event received
|
|
||||||
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
|
|
||||||
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
|
|
||||||
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
|
|
||||||
var err error
|
|
||||||
cachedService.rwlock.Lock()
|
|
||||||
var reachable bool
|
|
||||||
defer cachedService.rwlock.Unlock()
|
|
||||||
_, ok := cachedService.endpointMap[clusterName]
|
|
||||||
if !ok {
|
|
||||||
for _, subset := range endpoint.Subsets {
|
|
||||||
if len(subset.Addresses) > 0 {
|
|
||||||
reachable = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if reachable {
|
|
||||||
// first time get endpoints, update dns record
|
|
||||||
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
|
|
||||||
cachedService.endpointMap[clusterName] = 1
|
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
|
||||||
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
|
|
||||||
time.Sleep(cachedService.nextDNSUpdateDelay())
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for _, subset := range endpoint.Subsets {
|
|
||||||
if len(subset.Addresses) > 0 {
|
|
||||||
reachable = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !reachable {
|
|
||||||
// first time get endpoints, update dns record
|
|
||||||
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
|
|
||||||
delete(cachedService.endpointMap, clusterName)
|
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
|
||||||
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
|
|
||||||
time.Sleep(cachedService.nextDNSUpdateDelay())
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item.
|
|
||||||
func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) {
|
|
||||||
key, err := controller.KeyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_, ok := cc.clientMap[clusterName]
|
|
||||||
if ok {
|
|
||||||
cc.clientMap[clusterName].endpointQueue.Add(key)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,167 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
|
||||||
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
|
|
||||||
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
|
|
||||||
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
|
|
||||||
v1 "k8s.io/kubernetes/pkg/api/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
|
|
||||||
var fakeDnsZones, _ = fakeDns.Zones()
|
|
||||||
var fakeClient = &fakefedclientset.Clientset{}
|
|
||||||
|
|
||||||
var fakeServiceController = ServiceController{
|
|
||||||
federationClient: fakeClient,
|
|
||||||
dns: fakeDns,
|
|
||||||
dnsZones: fakeDnsZones,
|
|
||||||
federationName: "fed1",
|
|
||||||
zoneName: "example.com",
|
|
||||||
serviceDnsSuffix: "federation.example.com",
|
|
||||||
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
|
||||||
clusterCache: &clusterClientCache{
|
|
||||||
clientMap: make(map[string]*clusterCache),
|
|
||||||
},
|
|
||||||
knownClusterSet: make(sets.String),
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildEndpoint(subsets [][]string) *v1.Endpoints {
|
|
||||||
endpoint := &v1.Endpoints{
|
|
||||||
Subsets: []v1.EndpointSubset{
|
|
||||||
{Addresses: []v1.EndpointAddress{}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, element := range subsets {
|
|
||||||
address := v1.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil}
|
|
||||||
endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address)
|
|
||||||
}
|
|
||||||
return endpoint
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProcessEndpointUpdate(t *testing.T) {
|
|
||||||
clusterName := "foo"
|
|
||||||
cc := clusterClientCache{
|
|
||||||
clientMap: map[string]*clusterCache{
|
|
||||||
clusterName: {
|
|
||||||
cluster: &v1beta1.Cluster{
|
|
||||||
Status: v1beta1.ClusterStatus{
|
|
||||||
Zones: []string{"foozone"},
|
|
||||||
Region: "fooregion",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
cachedService *cachedService
|
|
||||||
endpoint *v1.Endpoints
|
|
||||||
clusterName string
|
|
||||||
expectResult int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"no-cache",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{},
|
|
||||||
endpointMap: make(map[string]int),
|
|
||||||
},
|
|
||||||
buildEndpoint([][]string{{"ip1", ""}}),
|
|
||||||
clusterName,
|
|
||||||
1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"has-cache",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{},
|
|
||||||
endpointMap: map[string]int{
|
|
||||||
"foo": 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
buildEndpoint([][]string{{"ip1", ""}}),
|
|
||||||
clusterName,
|
|
||||||
1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
fakeServiceController.clusterCache = &cc
|
|
||||||
for _, test := range tests {
|
|
||||||
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
|
|
||||||
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
|
|
||||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProcessEndpointDeletion(t *testing.T) {
|
|
||||||
clusterName := "foo"
|
|
||||||
cc := clusterClientCache{
|
|
||||||
clientMap: map[string]*clusterCache{
|
|
||||||
clusterName: {
|
|
||||||
cluster: &v1beta1.Cluster{
|
|
||||||
Status: v1beta1.ClusterStatus{
|
|
||||||
Zones: []string{"foozone"},
|
|
||||||
Region: "fooregion",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
cachedService *cachedService
|
|
||||||
endpoint *v1.Endpoints
|
|
||||||
clusterName string
|
|
||||||
expectResult int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"no-cache",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{},
|
|
||||||
endpointMap: make(map[string]int),
|
|
||||||
},
|
|
||||||
buildEndpoint([][]string{{"ip1", ""}}),
|
|
||||||
clusterName,
|
|
||||||
0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"has-cache",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{},
|
|
||||||
endpointMap: map[string]int{
|
|
||||||
clusterName: 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
buildEndpoint([][]string{{"ip1", ""}}),
|
|
||||||
clusterName,
|
|
||||||
0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
fakeServiceController.clusterCache = &cc
|
|
||||||
for _, test := range tests {
|
|
||||||
cc.processEndpointDeletion(test.cachedService, test.clusterName, &fakeServiceController)
|
|
||||||
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
|
|
||||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,302 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
cache "k8s.io/client-go/tools/cache"
|
|
||||||
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
|
||||||
v1 "k8s.io/kubernetes/pkg/api/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
|
||||||
|
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
|
||||||
func (sc *ServiceController) clusterServiceWorker() {
|
|
||||||
// process all pending events in serviceWorkerDoneChan
|
|
||||||
ForLoop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case clusterName := <-sc.serviceWorkerDoneChan:
|
|
||||||
sc.serviceWorkerMap[clusterName] = false
|
|
||||||
default:
|
|
||||||
// non-blocking, comes here if all existing events are processed
|
|
||||||
break ForLoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for clusterName, cache := range sc.clusterCache.clientMap {
|
|
||||||
workerExist, found := sc.serviceWorkerMap[clusterName]
|
|
||||||
if found && workerExist {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a worker only if the previous worker has finished and gone out of scope
|
|
||||||
go func(cache *clusterCache, clusterName string) {
|
|
||||||
fedClient := sc.federationClient
|
|
||||||
for {
|
|
||||||
func() {
|
|
||||||
key, quit := cache.serviceQueue.Get()
|
|
||||||
if quit {
|
|
||||||
// send signal that current worker has finished tasks and is going out of scope
|
|
||||||
sc.serviceWorkerDoneChan <- clusterName
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer cache.serviceQueue.Done(key)
|
|
||||||
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to sync service: %+v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}(cache, clusterName)
|
|
||||||
sc.serviceWorkerMap[clusterName] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Whenever there is change on service, the federation service should be updated
|
|
||||||
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient fedclientset.Interface, sc *ServiceController) error {
|
|
||||||
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
|
|
||||||
cachedService, ok := serviceCache.get(key)
|
|
||||||
if !ok {
|
|
||||||
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
|
|
||||||
clusterCache.serviceQueue.Add(key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var needUpdate, isDeletion bool
|
|
||||||
service, err := clusterCache.serviceStore.Services(namespace).Get(name)
|
|
||||||
switch {
|
|
||||||
case errors.IsNotFound(err):
|
|
||||||
glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName)
|
|
||||||
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
|
|
||||||
isDeletion = true
|
|
||||||
case err != nil:
|
|
||||||
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
|
|
||||||
clusterCache.serviceQueue.Add(key)
|
|
||||||
return err
|
|
||||||
default:
|
|
||||||
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
|
|
||||||
}
|
|
||||||
|
|
||||||
if needUpdate {
|
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
|
||||||
err := sc.ensureDnsRecords(clusterName, service)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err)
|
|
||||||
time.Sleep(cachedService.nextDNSUpdateDelay())
|
|
||||||
clusterCache.serviceQueue.Add(key)
|
|
||||||
// did not retry here as we still want to persist federation apiserver even ensure dns records fails
|
|
||||||
}
|
|
||||||
err := cc.persistFedServiceUpdate(cachedService, fedClient)
|
|
||||||
if err == nil {
|
|
||||||
cachedService.appliedState = cachedService.lastState
|
|
||||||
cachedService.resetFedUpdateDelay()
|
|
||||||
} else {
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
|
|
||||||
clusterCache.serviceQueue.Add(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if isDeletion {
|
|
||||||
// cachedService is not reliable here as
|
|
||||||
// deleting cache is the last step of federation service deletion
|
|
||||||
service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{})
|
|
||||||
// rebuild service if federation service still exists
|
|
||||||
if err == nil || !errors.IsNotFound(err) {
|
|
||||||
if err == nil && service.DeletionTimestamp != nil {
|
|
||||||
glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// processServiceDeletion is triggered when a service is delete from underlying k8s cluster
|
|
||||||
// the deletion function will wip out the cached ingress info of the service from federation service ingress
|
|
||||||
// the function returns a bool to indicate if actual update happened on federation service cache
|
|
||||||
// and if the federation service cache is updated, the updated info should be post to federation apiserver
|
|
||||||
func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedService, clusterName string) bool {
|
|
||||||
cachedService.rwlock.Lock()
|
|
||||||
defer cachedService.rwlock.Unlock()
|
|
||||||
cachedStatus, ok := cachedService.serviceStatusMap[clusterName]
|
|
||||||
// cached status found, remove ingress info from federation service cache
|
|
||||||
if ok {
|
|
||||||
cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer
|
|
||||||
removeIndexes := []int{}
|
|
||||||
for i, fed := range cachedFedServiceStatus.Ingress {
|
|
||||||
for _, new := range cachedStatus.Ingress {
|
|
||||||
// remove if same ingress record found
|
|
||||||
if new.IP == fed.IP && new.Hostname == fed.Hostname {
|
|
||||||
removeIndexes = append(removeIndexes, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Ints(removeIndexes)
|
|
||||||
for i := len(removeIndexes) - 1; i >= 0; i-- {
|
|
||||||
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...)
|
|
||||||
glog.V(4).Infof("Remove old ingress %d for service %s/%s", removeIndexes[i], cachedService.lastState.Namespace, cachedService.lastState.Name)
|
|
||||||
}
|
|
||||||
delete(cachedService.serviceStatusMap, clusterName)
|
|
||||||
delete(cachedService.endpointMap, clusterName)
|
|
||||||
cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus
|
|
||||||
return true
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Service removal %s/%s from cluster %s observed.", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// processServiceUpdate Update ingress info when service updated
|
|
||||||
// the function returns a bool to indicate if actual update happened on federation service cache
|
|
||||||
// and if the federation service cache is updated, the updated info should be post to federation apiserver
|
|
||||||
func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *v1.Service, clusterName string) bool {
|
|
||||||
glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
cachedService.rwlock.Lock()
|
|
||||||
defer cachedService.rwlock.Unlock()
|
|
||||||
var needUpdate bool
|
|
||||||
newServiceLB := service.Status.LoadBalancer
|
|
||||||
cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer
|
|
||||||
if len(newServiceLB.Ingress) == 0 {
|
|
||||||
// not yet get LB IP
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
cachedStatus, ok := cachedService.serviceStatusMap[clusterName]
|
|
||||||
if ok {
|
|
||||||
if reflect.DeepEqual(cachedStatus, newServiceLB) {
|
|
||||||
glog.V(4).Infof("Same ingress info observed for service %s/%s: %+v ", service.Namespace, service.Name, cachedStatus.Ingress)
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Ingress info was changed for service %s/%s: cache: %+v, new: %+v ",
|
|
||||||
service.Namespace, service.Name, cachedStatus.Ingress, newServiceLB)
|
|
||||||
needUpdate = true
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Cached service status was not found for %s/%s, cluster %s, building one", service.Namespace, service.Name, clusterName)
|
|
||||||
|
|
||||||
// cache is not always reliable(cache will be cleaned when service controller restart)
|
|
||||||
// two cases will run into this branch:
|
|
||||||
// 1. new service loadbalancer info received -> no info in cache, and no in federation service
|
|
||||||
// 2. service controller being restarted -> no info in cache, but it is in federation service
|
|
||||||
|
|
||||||
// check if the lb info is already in federation service
|
|
||||||
|
|
||||||
cachedService.serviceStatusMap[clusterName] = newServiceLB
|
|
||||||
needUpdate = false
|
|
||||||
// iterate service ingress info
|
|
||||||
for _, new := range newServiceLB.Ingress {
|
|
||||||
var found bool
|
|
||||||
// if it is known by federation service
|
|
||||||
for _, fed := range cachedFedServiceStatus.Ingress {
|
|
||||||
if new.IP == fed.IP && new.Hostname == fed.Hostname {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
needUpdate = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if needUpdate {
|
|
||||||
// new status = cached federation status - cached status + new status from k8s cluster
|
|
||||||
|
|
||||||
removeIndexes := []int{}
|
|
||||||
for i, fed := range cachedFedServiceStatus.Ingress {
|
|
||||||
for _, new := range cachedStatus.Ingress {
|
|
||||||
// remove if same ingress record found
|
|
||||||
if new.IP == fed.IP && new.Hostname == fed.Hostname {
|
|
||||||
removeIndexes = append(removeIndexes, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Ints(removeIndexes)
|
|
||||||
for i := len(removeIndexes) - 1; i >= 0; i-- {
|
|
||||||
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...)
|
|
||||||
}
|
|
||||||
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress, service.Status.LoadBalancer.Ingress...)
|
|
||||||
cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus
|
|
||||||
glog.V(4).Infof("Add new ingress info %+v for service %s/%s", service.Status.LoadBalancer, service.Namespace, service.Name)
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Same ingress info found for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
}
|
|
||||||
return needUpdate
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient fedclientset.Interface) error {
|
|
||||||
service := cachedService.lastState
|
|
||||||
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
|
|
||||||
var err error
|
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
|
||||||
_, err := fedClient.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
|
||||||
service.Namespace, service.Name, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
_, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service)
|
|
||||||
if err == nil {
|
|
||||||
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
|
||||||
service.Namespace, service.Name, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
|
||||||
service.Namespace, service.Name, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
time.Sleep(cachedService.nextFedUpdateDelay())
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
|
||||||
func (cc *clusterClientCache) enqueueService(obj interface{}, clusterName string) {
|
|
||||||
key, err := controller.KeyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_, ok := cc.clientMap[clusterName]
|
|
||||||
if ok {
|
|
||||||
cc.clientMap[clusterName].serviceQueue.Add(key)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,163 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
func buildServiceStatus(ingresses [][]string) v1.LoadBalancerStatus {
|
|
||||||
status := v1.LoadBalancerStatus{
|
|
||||||
Ingress: []v1.LoadBalancerIngress{},
|
|
||||||
}
|
|
||||||
for _, element := range ingresses {
|
|
||||||
ingress := v1.LoadBalancerIngress{IP: element[0], Hostname: element[1]}
|
|
||||||
status.Ingress = append(status.Ingress, ingress)
|
|
||||||
}
|
|
||||||
return status
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProcessServiceUpdate(t *testing.T) {
|
|
||||||
cc := clusterClientCache{
|
|
||||||
clientMap: make(map[string]*clusterCache),
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
cachedService *cachedService
|
|
||||||
service *v1.Service
|
|
||||||
clusterName string
|
|
||||||
expectNeedUpdate bool
|
|
||||||
expectStatus v1.LoadBalancerStatus
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"no-cache",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{},
|
|
||||||
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
|
|
||||||
},
|
|
||||||
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
|
||||||
"foo",
|
|
||||||
true,
|
|
||||||
buildServiceStatus([][]string{{"ip1", ""}}),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"same-ingress",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
|
||||||
serviceStatusMap: map[string]v1.LoadBalancerStatus{
|
|
||||||
"foo1": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
|
||||||
"foo1",
|
|
||||||
false,
|
|
||||||
buildServiceStatus([][]string{{"ip1", ""}}),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"diff-cluster",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "bar1"},
|
|
||||||
},
|
|
||||||
serviceStatusMap: map[string]v1.LoadBalancerStatus{
|
|
||||||
"foo2": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
|
||||||
"foo1",
|
|
||||||
true,
|
|
||||||
buildServiceStatus([][]string{{"ip1", ""}}),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"diff-ingress",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}},
|
|
||||||
serviceStatusMap: map[string]v1.LoadBalancerStatus{
|
|
||||||
"foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}},
|
|
||||||
"foo1",
|
|
||||||
true,
|
|
||||||
buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, test := range tests {
|
|
||||||
result := cc.processServiceUpdate(test.cachedService, test.service, test.clusterName)
|
|
||||||
if test.expectNeedUpdate != result {
|
|
||||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) {
|
|
||||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProcessServiceDeletion(t *testing.T) {
|
|
||||||
cc := clusterClientCache{
|
|
||||||
clientMap: make(map[string]*clusterCache),
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
cachedService *cachedService
|
|
||||||
service *v1.Service
|
|
||||||
clusterName string
|
|
||||||
expectNeedUpdate bool
|
|
||||||
expectStatus v1.LoadBalancerStatus
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"same-ingress",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
|
||||||
serviceStatusMap: map[string]v1.LoadBalancerStatus{
|
|
||||||
"foo1": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
|
||||||
"foo1",
|
|
||||||
true,
|
|
||||||
buildServiceStatus([][]string{}),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"diff-ingress",
|
|
||||||
&cachedService{
|
|
||||||
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}},
|
|
||||||
serviceStatusMap: map[string]v1.LoadBalancerStatus{
|
|
||||||
"foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}),
|
|
||||||
"foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}},
|
|
||||||
"foo1",
|
|
||||||
true,
|
|
||||||
buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, test := range tests {
|
|
||||||
result := cc.processServiceDeletion(test.cachedService, test.clusterName)
|
|
||||||
if test.expectNeedUpdate != result {
|
|
||||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) {
|
|
||||||
t.Errorf("Test failed for %s, expected %+v, saw %+v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -18,23 +18,19 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||||
@ -59,27 +55,9 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
serviceSyncPeriod = 10 * time.Minute
|
serviceSyncPeriod = 10 * time.Minute
|
||||||
clusterSyncPeriod = 10 * time.Minute
|
|
||||||
|
|
||||||
// How long to wait before retrying the processing of a service change.
|
|
||||||
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
|
|
||||||
// should be changed appropriately.
|
|
||||||
minRetryDelay = 5 * time.Second
|
|
||||||
maxRetryDelay = 300 * time.Second
|
|
||||||
|
|
||||||
// client retry count and interval is when accessing a remote kube-apiserver or federation apiserver
|
|
||||||
// how many times should be attempted and how long it should sleep when failure occurs
|
|
||||||
// the retry should be in short time so no exponential backoff
|
|
||||||
clientRetryCount = 5
|
|
||||||
|
|
||||||
retryable = true
|
|
||||||
|
|
||||||
doNotRetry = time.Duration(0)
|
|
||||||
|
|
||||||
UserAgentName = "federation-service-controller"
|
UserAgentName = "federation-service-controller"
|
||||||
|
|
||||||
maxNoOfClusters = 100
|
|
||||||
|
|
||||||
reviewDelay = 10 * time.Second
|
reviewDelay = 10 * time.Second
|
||||||
updateTimeout = 30 * time.Second
|
updateTimeout = 30 * time.Second
|
||||||
allClustersKey = "ALL_CLUSTERS"
|
allClustersKey = "ALL_CLUSTERS"
|
||||||
@ -91,33 +69,6 @@ var (
|
|||||||
RequiredResources = []schema.GroupVersionResource{v1.SchemeGroupVersion.WithResource("services")}
|
RequiredResources = []schema.GroupVersionResource{v1.SchemeGroupVersion.WithResource("services")}
|
||||||
)
|
)
|
||||||
|
|
||||||
type cachedService struct {
|
|
||||||
lastState *v1.Service
|
|
||||||
// The state as successfully applied to the DNS server
|
|
||||||
appliedState *v1.Service
|
|
||||||
// cluster endpoint map hold subset info from kubernetes clusters
|
|
||||||
// key clusterName
|
|
||||||
// value is a flag that if there is ready address, 1 means there is ready address
|
|
||||||
endpointMap map[string]int
|
|
||||||
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
|
|
||||||
serviceStatusMap map[string]v1.LoadBalancerStatus
|
|
||||||
// Ensures only one goroutine can operate on this service at any given time.
|
|
||||||
rwlock sync.Mutex
|
|
||||||
// Controls error back-off for proceeding federation service to k8s clusters
|
|
||||||
lastRetryDelay time.Duration
|
|
||||||
// Controls error back-off for updating federation service back to federation apiserver
|
|
||||||
lastFedUpdateDelay time.Duration
|
|
||||||
// Controls error back-off for dns record update
|
|
||||||
lastDNSUpdateDelay time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type serviceCache struct {
|
|
||||||
rwlock sync.Mutex // protects serviceMap
|
|
||||||
// federation service map contains all service received from federation apiserver
|
|
||||||
// key serviceName
|
|
||||||
fedServiceMap map[string]*cachedService
|
|
||||||
}
|
|
||||||
|
|
||||||
type ServiceController struct {
|
type ServiceController struct {
|
||||||
dns dnsprovider.Interface
|
dns dnsprovider.Interface
|
||||||
federationClient fedclientset.Interface
|
federationClient fedclientset.Interface
|
||||||
@ -128,9 +79,7 @@ type ServiceController struct {
|
|||||||
zoneName string
|
zoneName string
|
||||||
zoneID string
|
zoneID string
|
||||||
// each federation should be configured with a single zone (e.g. "mycompany.com")
|
// each federation should be configured with a single zone (e.g. "mycompany.com")
|
||||||
dnsZones dnsprovider.Zones
|
dnsZones dnsprovider.Zones
|
||||||
serviceCache *serviceCache
|
|
||||||
clusterCache *clusterClientCache
|
|
||||||
// A store of services, populated by the serviceController
|
// A store of services, populated by the serviceController
|
||||||
serviceStore corelisters.ServiceLister
|
serviceStore corelisters.ServiceLister
|
||||||
// Watches changes to all services
|
// Watches changes to all services
|
||||||
@ -143,18 +92,7 @@ type ServiceController struct {
|
|||||||
eventBroadcaster record.EventBroadcaster
|
eventBroadcaster record.EventBroadcaster
|
||||||
eventRecorder record.EventRecorder
|
eventRecorder record.EventRecorder
|
||||||
// services that need to be synced
|
// services that need to be synced
|
||||||
queue *workqueue.Type
|
queue *workqueue.Type
|
||||||
knownClusterSet sets.String
|
|
||||||
// endpoint worker map contains all the clusters registered with an indication that worker exist
|
|
||||||
// key clusterName
|
|
||||||
endpointWorkerMap map[string]bool
|
|
||||||
// channel for worker to signal that it is going out of existence
|
|
||||||
endpointWorkerDoneChan chan string
|
|
||||||
// service worker map contains all the clusters registered with an indication that worker exist
|
|
||||||
// key clusterName
|
|
||||||
serviceWorkerMap map[string]bool
|
|
||||||
// channel for worker to signal that it is going out of existence
|
|
||||||
serviceWorkerDoneChan chan string
|
|
||||||
|
|
||||||
// For triggering all services reconciliation. This is used when
|
// For triggering all services reconciliation. This is used when
|
||||||
// a new cluster becomes available.
|
// a new cluster becomes available.
|
||||||
@ -174,7 +112,6 @@ type ServiceController struct {
|
|||||||
|
|
||||||
// New returns a new service controller to keep DNS provider service resources
|
// New returns a new service controller to keep DNS provider service resources
|
||||||
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
|
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
|
||||||
|
|
||||||
func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
||||||
federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
|
federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
|
||||||
broadcaster := record.NewBroadcaster()
|
broadcaster := record.NewBroadcaster()
|
||||||
@ -183,21 +120,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
|||||||
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
|
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
|
||||||
|
|
||||||
s := &ServiceController{
|
s := &ServiceController{
|
||||||
dns: dns,
|
dns: dns,
|
||||||
federationClient: federationClient,
|
federationClient: federationClient,
|
||||||
federationName: federationName,
|
federationName: federationName,
|
||||||
serviceDnsSuffix: serviceDnsSuffix,
|
serviceDnsSuffix: serviceDnsSuffix,
|
||||||
zoneName: zoneName,
|
zoneName: zoneName,
|
||||||
zoneID: zoneID,
|
zoneID: zoneID,
|
||||||
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
|
||||||
clusterCache: &clusterClientCache{
|
|
||||||
rwlock: sync.Mutex{},
|
|
||||||
clientMap: make(map[string]*clusterCache),
|
|
||||||
},
|
|
||||||
eventBroadcaster: broadcaster,
|
eventBroadcaster: broadcaster,
|
||||||
eventRecorder: recorder,
|
eventRecorder: recorder,
|
||||||
queue: workqueue.New(),
|
queue: workqueue.New(),
|
||||||
knownClusterSet: make(sets.String),
|
|
||||||
reviewDelay: reviewDelay,
|
reviewDelay: reviewDelay,
|
||||||
clusterAvailableDelay: clusterAvailableDelay,
|
clusterAvailableDelay: clusterAvailableDelay,
|
||||||
updateTimeout: updateTimeout,
|
updateTimeout: updateTimeout,
|
||||||
@ -317,10 +248,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
|||||||
s.federatedUpdater,
|
s.federatedUpdater,
|
||||||
)
|
)
|
||||||
|
|
||||||
s.endpointWorkerMap = make(map[string]bool)
|
|
||||||
s.serviceWorkerMap = make(map[string]bool)
|
|
||||||
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
|
|
||||||
s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -331,16 +258,6 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj
|
|||||||
return s.federationClient.Core().Services(service.Namespace).Update(service)
|
return s.federationClient.Core().Services(service.Namespace).Update(service)
|
||||||
}
|
}
|
||||||
|
|
||||||
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
|
||||||
func (s *ServiceController) enqueueService(obj interface{}) {
|
|
||||||
key, err := controller.KeyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.queue.Add(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts a background goroutine that watches for changes to federation services
|
// Run starts a background goroutine that watches for changes to federation services
|
||||||
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
|
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
|
||||||
// federationSyncPeriod controls how often we check the federation's services to
|
// federationSyncPeriod controls how often we check the federation's services to
|
||||||
@ -472,579 +389,6 @@ func wantsDNSRecords(service *v1.Service) bool {
|
|||||||
return service.Spec.Type == v1.ServiceTypeLoadBalancer
|
return service.Spec.Type == v1.ServiceTypeLoadBalancer
|
||||||
}
|
}
|
||||||
|
|
||||||
// processServiceForCluster creates or updates service to all registered running clusters,
|
|
||||||
// update DNS records and update the service info with DNS entries to federation apiserver.
|
|
||||||
// the function returns any error caught
|
|
||||||
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
|
|
||||||
if service.DeletionTimestamp != nil {
|
|
||||||
glog.V(4).Infof("Service has already been marked for deletion %v", service.Name)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
// Create or Update k8s Service
|
|
||||||
err := s.ensureClusterService(cachedService, clusterName, service, client)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateFederationService Returns whatever error occurred along with a boolean indicator of whether it
|
|
||||||
// should be retried.
|
|
||||||
func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) {
|
|
||||||
// Clone federation service, and create them in underlying k8s cluster
|
|
||||||
desiredService := &v1.Service{
|
|
||||||
ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta),
|
|
||||||
Spec: *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)),
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle available clusters one by one
|
|
||||||
hasErr := false
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for clusterName, cache := range s.clusterCache.clientMap {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(cache *clusterCache, clusterName string) {
|
|
||||||
defer wg.Done()
|
|
||||||
err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
|
|
||||||
if err != nil {
|
|
||||||
hasErr = true
|
|
||||||
}
|
|
||||||
}(cache, clusterName)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
if hasErr {
|
|
||||||
// detail error has been dumped inside the loop
|
|
||||||
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable
|
|
||||||
}
|
|
||||||
return nil, !retryable
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
|
|
||||||
var err error
|
|
||||||
var needUpdate bool
|
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
|
||||||
svc, err := client.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
|
|
||||||
if err == nil {
|
|
||||||
// service exists
|
|
||||||
glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
//reserve immutable fields
|
|
||||||
service.Spec.ClusterIP = svc.Spec.ClusterIP
|
|
||||||
|
|
||||||
//reserve auto assigned field
|
|
||||||
for i, oldPort := range svc.Spec.Ports {
|
|
||||||
for _, port := range service.Spec.Ports {
|
|
||||||
if port.NodePort == 0 {
|
|
||||||
if !portEqualExcludeNodePort(&oldPort, &port) {
|
|
||||||
svc.Spec.Ports[i] = port
|
|
||||||
needUpdate = true
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if !portEqualForLB(&oldPort, &port) {
|
|
||||||
svc.Spec.Ports[i] = port
|
|
||||||
needUpdate = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if needUpdate {
|
|
||||||
// we only apply spec update
|
|
||||||
svc.Spec = service.Spec
|
|
||||||
_, err = client.Core().Services(svc.Namespace).Update(svc)
|
|
||||||
if err == nil {
|
|
||||||
glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName)
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Failed to update %+v", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
} else if errors.IsNotFound(err) {
|
|
||||||
// Create service if it is not found
|
|
||||||
glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new",
|
|
||||||
service.Namespace, service.Name, clusterName)
|
|
||||||
service.ResourceVersion = ""
|
|
||||||
_, err = client.Core().Services(service.Namespace).Create(service)
|
|
||||||
if err == nil {
|
|
||||||
glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Failed to create %+v", err)
|
|
||||||
if errors.IsAlreadyExists(err) {
|
|
||||||
glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
|
||||||
service.Namespace, service.Name, err)
|
|
||||||
}
|
|
||||||
// should we reuse same retry delay for all clusters?
|
|
||||||
time.Sleep(cachedService.nextRetryDelay())
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *serviceCache) allServices() []*cachedService {
|
|
||||||
s.rwlock.Lock()
|
|
||||||
defer s.rwlock.Unlock()
|
|
||||||
services := make([]*cachedService, 0, len(s.fedServiceMap))
|
|
||||||
for _, v := range s.fedServiceMap {
|
|
||||||
services = append(services, v)
|
|
||||||
}
|
|
||||||
return services
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
|
|
||||||
s.rwlock.Lock()
|
|
||||||
defer s.rwlock.Unlock()
|
|
||||||
service, ok := s.fedServiceMap[serviceName]
|
|
||||||
return service, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
|
|
||||||
s.rwlock.Lock()
|
|
||||||
defer s.rwlock.Unlock()
|
|
||||||
service, ok := s.fedServiceMap[serviceName]
|
|
||||||
if !ok {
|
|
||||||
service = &cachedService{
|
|
||||||
endpointMap: make(map[string]int),
|
|
||||||
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
|
|
||||||
}
|
|
||||||
s.fedServiceMap[serviceName] = service
|
|
||||||
}
|
|
||||||
return service
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *serviceCache) set(serviceName string, service *cachedService) {
|
|
||||||
s.rwlock.Lock()
|
|
||||||
defer s.rwlock.Unlock()
|
|
||||||
s.fedServiceMap[serviceName] = service
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *serviceCache) delete(serviceName string) {
|
|
||||||
s.rwlock.Lock()
|
|
||||||
defer s.rwlock.Unlock()
|
|
||||||
delete(s.fedServiceMap, serviceName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// needsUpdateDNS check if the dns records of the given service should be updated
|
|
||||||
func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool {
|
|
||||||
if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
|
|
||||||
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
|
|
||||||
oldService.Spec.Type, newService.Spec.Type)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if !LoadBalancerIPsAreEqual(oldService, newService) {
|
|
||||||
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
|
|
||||||
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
|
|
||||||
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
|
|
||||||
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for i := range oldService.Spec.ExternalIPs {
|
|
||||||
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
|
|
||||||
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
|
|
||||||
newService.Spec.ExternalIPs[i])
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if oldService.UID != newService.UID {
|
|
||||||
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
|
|
||||||
oldService.UID, newService.UID)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
|
|
||||||
// TODO: quinton: Probably applies for DNS SVC records. Come back to this.
|
|
||||||
//var protocol api.Protocol
|
|
||||||
|
|
||||||
ports := []*v1.ServicePort{}
|
|
||||||
for i := range service.Spec.Ports {
|
|
||||||
sp := &service.Spec.Ports[i]
|
|
||||||
// The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation
|
|
||||||
ports = append(ports, sp)
|
|
||||||
}
|
|
||||||
return ports, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func portsEqualForLB(x, y *v1.Service) bool {
|
|
||||||
xPorts, err := getPortsForLB(x)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
yPorts, err := getPortsForLB(y)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return portSlicesEqualForLB(xPorts, yPorts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
|
|
||||||
if len(x) != len(y) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range x {
|
|
||||||
if !portEqualForLB(x[i], y[i]) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func portEqualForLB(x, y *v1.ServicePort) bool {
|
|
||||||
// TODO: Should we check name? (In theory, an LB could expose it)
|
|
||||||
if x.Name != y.Name {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if x.Protocol != y.Protocol {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if x.Port != y.Port {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if x.NodePort != y.NodePort {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func portEqualExcludeNodePort(x, y *v1.ServicePort) bool {
|
|
||||||
// TODO: Should we check name? (In theory, an LB could expose it)
|
|
||||||
if x.Name != y.Name {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if x.Protocol != y.Protocol {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if x.Port != y.Port {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func clustersFromList(list *v1beta1.ClusterList) []string {
|
|
||||||
result := []string{}
|
|
||||||
for ix := range list.Items {
|
|
||||||
result = append(result, list.Items[ix].Name)
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// getClusterConditionPredicate filter all clusters meet condition of
|
|
||||||
// condition.type=Ready and condition.status=true
|
|
||||||
func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
|
|
||||||
return func(cluster v1beta1.Cluster) bool {
|
|
||||||
// If we have no info, don't accept
|
|
||||||
if len(cluster.Status.Conditions) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, cond := range cluster.Status.Conditions {
|
|
||||||
//We consider the cluster for load balancing only when its ClusterReady condition status
|
|
||||||
//is ConditionTrue
|
|
||||||
if cond.Type == v1beta1.ClusterReady && cond.Status != v1.ConditionTrue {
|
|
||||||
glog.V(4).Infof("Ignoring cluster %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// clusterSyncLoop observes running clusters changes, and apply all services to new added cluster
|
|
||||||
// and add dns records for the changes
|
|
||||||
func (s *ServiceController) clusterSyncLoop() {
|
|
||||||
var servicesToUpdate []*cachedService
|
|
||||||
// should we remove cache for cluster from ready to not ready? should remove the condition predicate if no
|
|
||||||
clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List()
|
|
||||||
if err != nil {
|
|
||||||
glog.Infof("Fail to get cluster list")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
newClusters := clustersFromList(&clusters)
|
|
||||||
var newSet, increase sets.String
|
|
||||||
newSet = sets.NewString(newClusters...)
|
|
||||||
if newSet.Equal(s.knownClusterSet) {
|
|
||||||
// The set of cluster names in the services in the federation hasn't changed, but we can retry
|
|
||||||
// updating any services that we failed to update last time around.
|
|
||||||
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
glog.Infof("Detected change in list of cluster names. New set: %v, Old set: %v", newSet, s.knownClusterSet)
|
|
||||||
increase = newSet.Difference(s.knownClusterSet)
|
|
||||||
// do nothing when cluster is removed.
|
|
||||||
if increase != nil {
|
|
||||||
// Try updating all services, and save the ones that fail to try again next
|
|
||||||
// round.
|
|
||||||
servicesToUpdate = s.serviceCache.allServices()
|
|
||||||
numServices := len(servicesToUpdate)
|
|
||||||
for newCluster := range increase {
|
|
||||||
glog.Infof("New cluster observed %s", newCluster)
|
|
||||||
s.updateAllServicesToCluster(servicesToUpdate, newCluster)
|
|
||||||
}
|
|
||||||
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
|
|
||||||
glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster",
|
|
||||||
numServices-len(servicesToUpdate), numServices)
|
|
||||||
}
|
|
||||||
s.knownClusterSet = newSet
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) {
|
|
||||||
cluster, ok := s.clusterCache.clientMap[clusterName]
|
|
||||||
if ok {
|
|
||||||
for _, cachedService := range services {
|
|
||||||
appliedState := cachedService.lastState
|
|
||||||
s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateDNSRecords updates all existing federation service DNS Records so that
|
|
||||||
// they will match the list of cluster names provided.
|
|
||||||
// Returns the list of services that couldn't be updated.
|
|
||||||
func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) {
|
|
||||||
for _, service := range services {
|
|
||||||
func() {
|
|
||||||
service.rwlock.Lock()
|
|
||||||
defer service.rwlock.Unlock()
|
|
||||||
// If the applied state is nil, that means it hasn't yet been successfully dealt
|
|
||||||
// with by the DNS Record reconciler. We can trust the DNS Record
|
|
||||||
// reconciler to ensure the federation service's DNS records are created to target
|
|
||||||
// the correct backend service IP's
|
|
||||||
if service.appliedState == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := s.lockedUpdateDNSRecords(service, clusters); err != nil {
|
|
||||||
glog.Errorf("External error while updating DNS Records: %v.", err)
|
|
||||||
servicesToRetry = append(servicesToRetry, service)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
return servicesToRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
// lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex
|
|
||||||
// associated with the service.
|
|
||||||
func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error {
|
|
||||||
if !wantsDNSRecords(service.appliedState) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ensuredCount := 0
|
|
||||||
unensuredCount := 0
|
|
||||||
for key := range s.clusterCache.clientMap {
|
|
||||||
for _, clusterName := range clusterNames {
|
|
||||||
if key == clusterName {
|
|
||||||
err := s.ensureDnsRecords(clusterName, service.lastState)
|
|
||||||
if err != nil {
|
|
||||||
unensuredCount += 1
|
|
||||||
glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err)
|
|
||||||
} else {
|
|
||||||
ensuredCount += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
missedCount := len(clusterNames) - ensuredCount - unensuredCount
|
|
||||||
if missedCount > 0 || unensuredCount > 0 {
|
|
||||||
return fmt.Errorf("Failed to update DNS records for %d clusters for service %v due to missing clients [missed count: %d] and/or failing to ensure DNS records [unensured count: %d]",
|
|
||||||
len(clusterNames), service, missedCount, unensuredCount)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
|
|
||||||
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
|
|
||||||
}
|
|
||||||
|
|
||||||
// Computes the next retry, using exponential backoff
|
|
||||||
// mutex must be held.
|
|
||||||
func (s *cachedService) nextRetryDelay() time.Duration {
|
|
||||||
s.lastRetryDelay = s.lastRetryDelay * 2
|
|
||||||
if s.lastRetryDelay < minRetryDelay {
|
|
||||||
s.lastRetryDelay = minRetryDelay
|
|
||||||
}
|
|
||||||
if s.lastRetryDelay > maxRetryDelay {
|
|
||||||
s.lastRetryDelay = maxRetryDelay
|
|
||||||
}
|
|
||||||
return s.lastRetryDelay
|
|
||||||
}
|
|
||||||
|
|
||||||
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
|
||||||
func (s *cachedService) resetRetryDelay() {
|
|
||||||
s.lastRetryDelay = time.Duration(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Computes the next retry, using exponential backoff
|
|
||||||
// mutex must be held.
|
|
||||||
func (s *cachedService) nextFedUpdateDelay() time.Duration {
|
|
||||||
s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2
|
|
||||||
if s.lastFedUpdateDelay < minRetryDelay {
|
|
||||||
s.lastFedUpdateDelay = minRetryDelay
|
|
||||||
}
|
|
||||||
if s.lastFedUpdateDelay > maxRetryDelay {
|
|
||||||
s.lastFedUpdateDelay = maxRetryDelay
|
|
||||||
}
|
|
||||||
return s.lastFedUpdateDelay
|
|
||||||
}
|
|
||||||
|
|
||||||
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
|
||||||
func (s *cachedService) resetFedUpdateDelay() {
|
|
||||||
s.lastFedUpdateDelay = time.Duration(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Computes the next retry, using exponential backoff
|
|
||||||
// mutex must be held.
|
|
||||||
func (s *cachedService) nextDNSUpdateDelay() time.Duration {
|
|
||||||
s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2
|
|
||||||
if s.lastDNSUpdateDelay < minRetryDelay {
|
|
||||||
s.lastDNSUpdateDelay = minRetryDelay
|
|
||||||
}
|
|
||||||
if s.lastDNSUpdateDelay > maxRetryDelay {
|
|
||||||
s.lastDNSUpdateDelay = maxRetryDelay
|
|
||||||
}
|
|
||||||
return s.lastDNSUpdateDelay
|
|
||||||
}
|
|
||||||
|
|
||||||
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
|
||||||
func (s *cachedService) resetDNSUpdateDelay() {
|
|
||||||
s.lastDNSUpdateDelay = time.Duration(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
|
|
||||||
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
|
|
||||||
// invoked concurrently with the same key.
|
|
||||||
func (s *ServiceController) syncService(key string) error {
|
|
||||||
startTime := time.Now()
|
|
||||||
var cachedService *cachedService
|
|
||||||
var retryDelay time.Duration
|
|
||||||
defer func() {
|
|
||||||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
|
||||||
}()
|
|
||||||
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
|
|
||||||
s.queue.Add(key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
service, err := s.serviceStore.Services(namespace).Get(name)
|
|
||||||
switch {
|
|
||||||
case errors.IsNotFound(err):
|
|
||||||
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
|
||||||
glog.Infof("Service has been deleted %v", key)
|
|
||||||
err, retryDelay = s.processServiceDeletion(key)
|
|
||||||
case err != nil:
|
|
||||||
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
|
|
||||||
s.queue.Add(key)
|
|
||||||
return err
|
|
||||||
default:
|
|
||||||
// Create a copy before modifying the obj to prevent race condition with
|
|
||||||
// other readers of obj from store.
|
|
||||||
copy, err := conversion.NewCloner().DeepCopy(service)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
|
|
||||||
s.queue.Add(key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
service := copy.(*v1.Service)
|
|
||||||
cachedService = s.serviceCache.getOrCreate(key)
|
|
||||||
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if retryDelay != 0 {
|
|
||||||
s.enqueueService(service)
|
|
||||||
} else if err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
|
|
||||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
|
||||||
// we should retry in that Duration.
|
|
||||||
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
|
|
||||||
// Ensure that no other goroutine will interfere with our processing of the
|
|
||||||
// service.
|
|
||||||
cachedService.rwlock.Lock()
|
|
||||||
defer cachedService.rwlock.Unlock()
|
|
||||||
|
|
||||||
if service.DeletionTimestamp != nil {
|
|
||||||
if err := s.delete(service); err != nil {
|
|
||||||
glog.Errorf("Failed to delete %s: %v", service, err)
|
|
||||||
s.eventRecorder.Eventf(service, api.EventTypeWarning, "DeleteFailed",
|
|
||||||
"Service delete failed: %v", err)
|
|
||||||
return err, cachedService.nextRetryDelay()
|
|
||||||
}
|
|
||||||
return nil, doNotRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s",
|
|
||||||
service.Name)
|
|
||||||
// Add the required finalizers before creating a service in underlying clusters.
|
|
||||||
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v",
|
|
||||||
service.Name, err)
|
|
||||||
return err, cachedService.nextRetryDelay()
|
|
||||||
}
|
|
||||||
service = updatedServiceObj.(*v1.Service)
|
|
||||||
|
|
||||||
glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name)
|
|
||||||
|
|
||||||
// Update the cached service (used above for populating synthetic deletes)
|
|
||||||
// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver
|
|
||||||
// if the same service is changed before this go routine finished, there will be another queue entry to handle that.
|
|
||||||
cachedService.lastState = service
|
|
||||||
err, retry := s.updateFederationService(key, cachedService)
|
|
||||||
if err != nil {
|
|
||||||
message := "Error occurs when updating service to all clusters"
|
|
||||||
if retry {
|
|
||||||
message += " (will retry): "
|
|
||||||
} else {
|
|
||||||
message += " (will not retry): "
|
|
||||||
}
|
|
||||||
message += err.Error()
|
|
||||||
s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message)
|
|
||||||
return err, cachedService.nextRetryDelay()
|
|
||||||
}
|
|
||||||
// Always update the cache upon success.
|
|
||||||
// NOTE: Since we update the cached service if and only if we successfully
|
|
||||||
// processed it, a cached service being nil implies that it hasn't yet
|
|
||||||
// been successfully processed.
|
|
||||||
|
|
||||||
cachedService.appliedState = service
|
|
||||||
s.serviceCache.set(key, cachedService)
|
|
||||||
glog.V(4).Infof("Successfully proceeded services %s", key)
|
|
||||||
cachedService.resetRetryDelay()
|
|
||||||
return nil, doNotRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete deletes the given service or returns error if the deletion was not complete.
|
// delete deletes the given service or returns error if the deletion was not complete.
|
||||||
func (s *ServiceController) delete(service *v1.Service) error {
|
func (s *ServiceController) delete(service *v1.Service) error {
|
||||||
glog.V(3).Infof("Handling deletion of service: %v", *service)
|
glog.V(3).Infof("Handling deletion of service: %v", *service)
|
||||||
@ -1083,15 +427,6 @@ func (s *ServiceController) delete(service *v1.Service) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration
|
|
||||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
|
||||||
// we should retry in that Duration.
|
|
||||||
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
|
|
||||||
glog.V(2).Infof("Process service deletion for %v", key)
|
|
||||||
s.serviceCache.delete(key)
|
|
||||||
return nil, doNotRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ServiceController) deliverServicesOnClusterChange() {
|
func (s *ServiceController) deliverServicesOnClusterChange() {
|
||||||
if !s.isSynced() {
|
if !s.isSynced() {
|
||||||
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay)
|
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay)
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,7 +28,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
@ -43,64 +41,6 @@ import (
|
|||||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetClusterConditionPredicate(t *testing.T) {
|
|
||||||
fakedns, _ := clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
|
|
||||||
serviceController := ServiceController{
|
|
||||||
dns: fakedns,
|
|
||||||
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
|
||||||
clusterCache: &clusterClientCache{
|
|
||||||
rwlock: sync.Mutex{},
|
|
||||||
clientMap: make(map[string]*clusterCache),
|
|
||||||
},
|
|
||||||
knownClusterSet: make(sets.String),
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
cluster v1beta1.Cluster
|
|
||||||
expectAccept bool
|
|
||||||
name string
|
|
||||||
serviceController *ServiceController
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
cluster: v1beta1.Cluster{},
|
|
||||||
expectAccept: false,
|
|
||||||
name: "empty",
|
|
||||||
serviceController: &serviceController,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
cluster: v1beta1.Cluster{
|
|
||||||
Status: v1beta1.ClusterStatus{
|
|
||||||
Conditions: []v1beta1.ClusterCondition{
|
|
||||||
{Type: v1beta1.ClusterReady, Status: v1.ConditionTrue},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectAccept: true,
|
|
||||||
name: "basic",
|
|
||||||
serviceController: &serviceController,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
cluster: v1beta1.Cluster{
|
|
||||||
Status: v1beta1.ClusterStatus{
|
|
||||||
Conditions: []v1beta1.ClusterCondition{
|
|
||||||
{Type: v1beta1.ClusterReady, Status: v1.ConditionFalse},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectAccept: false,
|
|
||||||
name: "notready",
|
|
||||||
serviceController: &serviceController,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
pred := getClusterConditionPredicate()
|
|
||||||
for _, test := range tests {
|
|
||||||
accept := pred(test.cluster)
|
|
||||||
if accept != test.expectAccept {
|
|
||||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
retryInterval = 100 * time.Millisecond
|
retryInterval = 100 * time.Millisecond
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user