Merge pull request #44960 from shashidharatd/federation-service-controller-2

Automatic merge from submit-queue (batch tested with PRs 43395, 44960)

[Federation] Cleanup unused code in service controller

Post merging the PR #41258, lot of unused code is left behind in federation service controller. These changes were segregated to this PR, so that the original PR was small and manageable.

**Release note**:
```
NONE
```

cc @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-04-26 14:29:44 -07:00 committed by GitHub
commit ed0adbf395
10 changed files with 12 additions and 1842 deletions

View File

@ -11,12 +11,8 @@ load(
go_library(
name = "go_default_library",
srcs = [
"cluster_helper.go",
"dns.go",
"doc.go",
"endpoint_helper.go",
"ingress.go",
"service_helper.go",
"servicecontroller.go",
],
tags = ["automanaged"],
@ -37,17 +33,14 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
@ -59,8 +52,6 @@ go_test(
name = "go_default_test",
srcs = [
"dns_test.go",
"endpoint_helper_test.go",
"service_helper_test.go",
"servicecontroller_test.go",
],
library = ":go_default_library",
@ -80,7 +71,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],

View File

@ -1,212 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
cache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
v1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"reflect"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
type clusterCache struct {
clientset *kubeclientset.Clientset
cluster *v1beta1.Cluster
// A store of services, populated by the serviceController
serviceStore corelisters.ServiceLister
// Watches changes to all services
serviceController cache.Controller
// A store of endpoint, populated by the serviceController
endpointStore corelisters.EndpointsLister
// Watches changes to all endpoints
endpointController cache.Controller
// services that need to be synced
serviceQueue *workqueue.Type
// endpoints that need to be synced
endpointQueue *workqueue.Type
}
type clusterClientCache struct {
rwlock sync.Mutex // protects serviceMap
clientMap map[string]*clusterCache
}
func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterName string) {
cachedClusterClient, ok := cc.clientMap[clusterName]
// only create when no existing cachedClusterClient
if ok {
if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) {
//rebuild clientset when cluster spec is changed
clientset, err := newClusterClientset(cluster)
if err != nil || clientset == nil {
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
}
glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName)
cachedClusterClient.clientset = clientset
go cachedClusterClient.serviceController.Run(wait.NeverStop)
go cachedClusterClient.endpointController.Run(wait.NeverStop)
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
} else {
// do nothing when there is no spec change
glog.V(4).Infof("Keep clientset for cluster %s", clusterName)
return
}
} else {
glog.V(4).Infof("No client cache for cluster %s, building new", clusterName)
clientset, err := newClusterClientset(cluster)
if err != nil || clientset == nil {
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
}
cachedClusterClient = &clusterCache{
cluster: cluster,
clientset: clientset,
serviceQueue: workqueue.New(),
endpointQueue: workqueue.New(),
}
var endpointIndexer cache.Indexer
endpointIndexer, cachedClusterClient.endpointController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return clientset.Core().Endpoints(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.Core().Endpoints(metav1.NamespaceAll).Watch(options)
},
},
&v1.Endpoints{},
serviceSyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueEndpoint(obj, clusterName)
},
UpdateFunc: func(old, cur interface{}) {
cc.enqueueEndpoint(cur, clusterName)
},
DeleteFunc: func(obj interface{}) {
cc.enqueueEndpoint(obj, clusterName)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cachedClusterClient.endpointStore = corelisters.NewEndpointsLister(endpointIndexer)
var serviceIndexer cache.Indexer
serviceIndexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return clientset.Core().Services(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.Core().Services(metav1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
serviceSyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueService(obj, clusterName)
},
UpdateFunc: func(old, cur interface{}) {
oldService, ok := old.(*v1.Service)
if !ok {
return
}
curService, ok := cur.(*v1.Service)
if !ok {
return
}
if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) {
cc.enqueueService(cur, clusterName)
}
},
DeleteFunc: func(obj interface{}) {
service, _ := obj.(*v1.Service)
cc.enqueueService(obj, clusterName)
glog.V(2).Infof("Service %s/%s deletion found and enqueue to service store %s", service.Namespace, service.Name, clusterName)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cachedClusterClient.serviceStore = corelisters.NewServiceLister(serviceIndexer)
cc.clientMap[clusterName] = cachedClusterClient
go cachedClusterClient.serviceController.Run(wait.NeverStop)
go cachedClusterClient.endpointController.Run(wait.NeverStop)
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
}
}
//TODO: copied from cluster controller, to make this as common function in pass 2
// delFromClusterSet delete a cluster from clusterSet and
// delete the corresponding restclient from the map clusterKubeClientMap
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
cluster, ok := obj.(*v1beta1.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
if ok {
delete(cc.clientMap, cluster.Name)
} else {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj)
return
}
glog.Infof("Found tombstone for %v", obj)
delete(cc.clientMap, tombstone.Key)
}
}
// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
// restclient to map clusterKubeClientMap
func (cc *clusterClientCache) addToClientMap(obj interface{}) {
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
cluster, ok := obj.(*v1beta1.Cluster)
if !ok {
return
}
pred := getClusterConditionPredicate()
// check status
// skip if not ready
if pred(*cluster) {
cc.startClusterLW(cluster, cluster.Name)
}
}
func newClusterClientset(c *v1beta1.Cluster) (*kubeclientset.Clientset, error) {
clusterConfig, err := util.BuildClusterConfig(c)
if clusterConfig != nil {
clientset := kubeclientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
}
return nil, err
}

View File

@ -20,12 +20,10 @@ import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
@ -45,10 +43,9 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
barZoneDNSName := "servicename.servicenamespace.myfederation.svc.barzone.barregion.federation.example.com"
tests := []struct {
name string
service v1.Service
expected []string
serviceStatus v1.LoadBalancerStatus
name string
service v1.Service
expected []string
}{
{
name: "ServiceWithSingleLBIngress",
@ -62,7 +59,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
String()},
},
},
serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}),
expected: []string{
"example.com:" + globalDNSName + ":A:180:[198.51.100.1]",
"example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]",
@ -82,7 +78,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
Namespace: "servicenamespace",
},
},
serviceStatus: buildServiceStatus([][]string{{"", "randomstring.amazonelb.example.com"}}),
expected: []string{
"example.com:"+globalDNSName+":A:180:[198.51.100.1]",
"example.com:"+fooRegionDNSName+":A:180:[198.51.100.1]",
@ -207,31 +202,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
serviceDnsSuffix: "federation.example.com",
zoneName: "example.com",
federationName: "myfederation",
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
rwlock: sync.Mutex{},
clientMap: make(map[string]*clusterCache),
},
knownClusterSet: make(sets.String),
}
serviceController.clusterCache.clientMap[cluster1Name] = &clusterCache{
cluster: &v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
}
cachedService := &cachedService{
lastState: &test.service,
endpointMap: make(map[string]int),
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
}
cachedService.endpointMap[cluster1Name] = 1
if !reflect.DeepEqual(&test.serviceStatus, &v1.LoadBalancerStatus{}) {
cachedService.serviceStatusMap[cluster1Name] = test.serviceStatus
}
err := serviceController.ensureDnsRecords(cluster1Name, &test.service)

View File

@ -1,19 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package service contains code for syncing Kubernetes services,
// and cloud DNS servers with the federated service registry.
package service // import "k8s.io/kubernetes/federation/pkg/federation-controller/service"

View File

@ -1,202 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"time"
"k8s.io/apimachinery/pkg/api/errors"
cache "k8s.io/client-go/tools/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
)
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterEndpointWorker() {
// process all pending events in endpointWorkerDoneChan
ForLoop:
for {
select {
case clusterName := <-sc.endpointWorkerDoneChan:
sc.endpointWorkerMap[clusterName] = false
default:
// non-blocking, comes here if all existing events are processed
break ForLoop
}
}
for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, found := sc.endpointWorkerMap[clusterName]
if found && workerExist {
continue
}
// create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient
for {
func() {
key, quit := cache.endpointQueue.Get()
// update endpoint cache
if quit {
// send signal that current worker has finished tasks and is going out of scope
sc.endpointWorkerDoneChan <- clusterName
return
}
defer cache.endpointQueue.Done(key)
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
if err != nil {
glog.V(2).Infof("Failed to sync endpoint: %+v", err)
}
}()
}
}(cache, clusterName)
sc.endpointWorkerMap[clusterName] = true
}
}
// Whenever there is change on endpoint, the federation service should be updated
// key is the namespaced name of endpoint
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient fedclientset.Interface, serviceController *ServiceController) error {
cachedService, ok := serviceCache.get(key)
if !ok {
// here we filtered all non-federation services
return nil
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.endpointQueue.Add(key)
return err
}
endpoint, err := clusterCache.endpointStore.Endpoints(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
case err != nil:
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.endpointQueue.Add(key)
return err
default:
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
}
if err != nil {
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
clusterCache.endpointQueue.Add(key)
}
cachedService.resetDNSUpdateDelay()
return nil
}
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
var err error
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
_, ok := cachedService.endpointMap[clusterName]
// TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist
// need to query dns info from dnsprovider and make sure of if deletion is needed
if ok {
// endpoints lost, clean dns record
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
time.Sleep(cachedService.nextDNSUpdateDelay())
}
}
return err
}
// Update dns info when endpoint update event received
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
var err error
cachedService.rwlock.Lock()
var reachable bool
defer cachedService.rwlock.Unlock()
_, ok := cachedService.endpointMap[clusterName]
if !ok {
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
time.Sleep(cachedService.nextDNSUpdateDelay())
}
return err
}
} else {
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if !reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
time.Sleep(cachedService.nextDNSUpdateDelay())
}
return err
}
}
return nil
}
// obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item.
func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
_, ok := cc.clientMap[clusterName]
if ok {
cc.clientMap[clusterName].endpointQueue.Add(key)
}
}

View File

@ -1,167 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
v1 "k8s.io/kubernetes/pkg/api/v1"
)
var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
var fakeDnsZones, _ = fakeDns.Zones()
var fakeClient = &fakefedclientset.Clientset{}
var fakeServiceController = ServiceController{
federationClient: fakeClient,
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
zoneName: "example.com",
serviceDnsSuffix: "federation.example.com",
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
clientMap: make(map[string]*clusterCache),
},
knownClusterSet: make(sets.String),
}
func buildEndpoint(subsets [][]string) *v1.Endpoints {
endpoint := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{}},
},
}
for _, element := range subsets {
address := v1.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil}
endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address)
}
return endpoint
}
func TestProcessEndpointUpdate(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
tests := []struct {
name string
cachedService *cachedService
endpoint *v1.Endpoints
clusterName string
expectResult int
}{
{
"no-cache",
&cachedService{
lastState: &v1.Service{},
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
clusterName,
1,
},
{
"has-cache",
&cachedService{
lastState: &v1.Service{},
endpointMap: map[string]int{
"foo": 1,
},
},
buildEndpoint([][]string{{"ip1", ""}}),
clusterName,
1,
},
}
fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
}
}
}
func TestProcessEndpointDeletion(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
tests := []struct {
name string
cachedService *cachedService
endpoint *v1.Endpoints
clusterName string
expectResult int
}{
{
"no-cache",
&cachedService{
lastState: &v1.Service{},
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
clusterName,
0,
},
{
"has-cache",
&cachedService{
lastState: &v1.Service{},
endpointMap: map[string]int{
clusterName: 1,
},
},
buildEndpoint([][]string{{"ip1", ""}}),
clusterName,
0,
},
}
fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointDeletion(test.cachedService, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
}
}
}

View File

@ -1,302 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cache "k8s.io/client-go/tools/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/controller"
"reflect"
"sort"
"github.com/golang/glog"
)
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterServiceWorker() {
// process all pending events in serviceWorkerDoneChan
ForLoop:
for {
select {
case clusterName := <-sc.serviceWorkerDoneChan:
sc.serviceWorkerMap[clusterName] = false
default:
// non-blocking, comes here if all existing events are processed
break ForLoop
}
}
for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, found := sc.serviceWorkerMap[clusterName]
if found && workerExist {
continue
}
// create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient
for {
func() {
key, quit := cache.serviceQueue.Get()
if quit {
// send signal that current worker has finished tasks and is going out of scope
sc.serviceWorkerDoneChan <- clusterName
return
}
defer cache.serviceQueue.Done(key)
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
if err != nil {
glog.Errorf("Failed to sync service: %+v", err)
}
}()
}
}(cache, clusterName)
sc.serviceWorkerMap[clusterName] = true
}
}
// Whenever there is change on service, the federation service should be updated
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient fedclientset.Interface, sc *ServiceController) error {
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
cachedService, ok := serviceCache.get(key)
if !ok {
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
return nil
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.serviceQueue.Add(key)
return err
}
var needUpdate, isDeletion bool
service, err := clusterCache.serviceStore.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName)
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
isDeletion = true
case err != nil:
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.serviceQueue.Add(key)
return err
default:
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
}
if needUpdate {
for i := 0; i < clientRetryCount; i++ {
err := sc.ensureDnsRecords(clusterName, service)
if err == nil {
break
}
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err)
time.Sleep(cachedService.nextDNSUpdateDelay())
clusterCache.serviceQueue.Add(key)
// did not retry here as we still want to persist federation apiserver even ensure dns records fails
}
err := cc.persistFedServiceUpdate(cachedService, fedClient)
if err == nil {
cachedService.appliedState = cachedService.lastState
cachedService.resetFedUpdateDelay()
} else {
if err != nil {
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
clusterCache.serviceQueue.Add(key)
}
}
}
if isDeletion {
// cachedService is not reliable here as
// deleting cache is the last step of federation service deletion
service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{})
// rebuild service if federation service still exists
if err == nil || !errors.IsNotFound(err) {
if err == nil && service.DeletionTimestamp != nil {
glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name)
return nil
}
return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset)
}
}
return nil
}
// processServiceDeletion is triggered when a service is delete from underlying k8s cluster
// the deletion function will wip out the cached ingress info of the service from federation service ingress
// the function returns a bool to indicate if actual update happened on federation service cache
// and if the federation service cache is updated, the updated info should be post to federation apiserver
func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedService, clusterName string) bool {
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
cachedStatus, ok := cachedService.serviceStatusMap[clusterName]
// cached status found, remove ingress info from federation service cache
if ok {
cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer
removeIndexes := []int{}
for i, fed := range cachedFedServiceStatus.Ingress {
for _, new := range cachedStatus.Ingress {
// remove if same ingress record found
if new.IP == fed.IP && new.Hostname == fed.Hostname {
removeIndexes = append(removeIndexes, i)
}
}
}
sort.Ints(removeIndexes)
for i := len(removeIndexes) - 1; i >= 0; i-- {
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...)
glog.V(4).Infof("Remove old ingress %d for service %s/%s", removeIndexes[i], cachedService.lastState.Namespace, cachedService.lastState.Name)
}
delete(cachedService.serviceStatusMap, clusterName)
delete(cachedService.endpointMap, clusterName)
cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus
return true
} else {
glog.V(4).Infof("Service removal %s/%s from cluster %s observed.", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
}
return false
}
// processServiceUpdate Update ingress info when service updated
// the function returns a bool to indicate if actual update happened on federation service cache
// and if the federation service cache is updated, the updated info should be post to federation apiserver
func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *v1.Service, clusterName string) bool {
glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
var needUpdate bool
newServiceLB := service.Status.LoadBalancer
cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer
if len(newServiceLB.Ingress) == 0 {
// not yet get LB IP
return false
}
cachedStatus, ok := cachedService.serviceStatusMap[clusterName]
if ok {
if reflect.DeepEqual(cachedStatus, newServiceLB) {
glog.V(4).Infof("Same ingress info observed for service %s/%s: %+v ", service.Namespace, service.Name, cachedStatus.Ingress)
} else {
glog.V(4).Infof("Ingress info was changed for service %s/%s: cache: %+v, new: %+v ",
service.Namespace, service.Name, cachedStatus.Ingress, newServiceLB)
needUpdate = true
}
} else {
glog.V(4).Infof("Cached service status was not found for %s/%s, cluster %s, building one", service.Namespace, service.Name, clusterName)
// cache is not always reliable(cache will be cleaned when service controller restart)
// two cases will run into this branch:
// 1. new service loadbalancer info received -> no info in cache, and no in federation service
// 2. service controller being restarted -> no info in cache, but it is in federation service
// check if the lb info is already in federation service
cachedService.serviceStatusMap[clusterName] = newServiceLB
needUpdate = false
// iterate service ingress info
for _, new := range newServiceLB.Ingress {
var found bool
// if it is known by federation service
for _, fed := range cachedFedServiceStatus.Ingress {
if new.IP == fed.IP && new.Hostname == fed.Hostname {
found = true
break
}
}
if !found {
needUpdate = true
break
}
}
}
if needUpdate {
// new status = cached federation status - cached status + new status from k8s cluster
removeIndexes := []int{}
for i, fed := range cachedFedServiceStatus.Ingress {
for _, new := range cachedStatus.Ingress {
// remove if same ingress record found
if new.IP == fed.IP && new.Hostname == fed.Hostname {
removeIndexes = append(removeIndexes, i)
}
}
}
sort.Ints(removeIndexes)
for i := len(removeIndexes) - 1; i >= 0; i-- {
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...)
}
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress, service.Status.LoadBalancer.Ingress...)
cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus
glog.V(4).Infof("Add new ingress info %+v for service %s/%s", service.Status.LoadBalancer, service.Namespace, service.Name)
} else {
glog.V(4).Infof("Same ingress info found for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
}
return needUpdate
}
func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient fedclientset.Interface) error {
service := cachedService.lastState
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
var err error
for i := 0; i < clientRetryCount; i++ {
_, err := fedClient.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
}
_, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service)
if err == nil {
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
return nil
}
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
}
if errors.IsConflict(err) {
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
service.Namespace, service.Name, err)
return err
}
time.Sleep(cachedService.nextFedUpdateDelay())
}
return err
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (cc *clusterClientCache) enqueueService(obj interface{}, clusterName string) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
_, ok := cc.clientMap[clusterName]
if ok {
cc.clientMap[clusterName].serviceQueue.Add(key)
}
}

View File

@ -1,163 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1"
)
func buildServiceStatus(ingresses [][]string) v1.LoadBalancerStatus {
status := v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{},
}
for _, element := range ingresses {
ingress := v1.LoadBalancerIngress{IP: element[0], Hostname: element[1]}
status.Ingress = append(status.Ingress, ingress)
}
return status
}
func TestProcessServiceUpdate(t *testing.T) {
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
}
tests := []struct {
name string
cachedService *cachedService
service *v1.Service
clusterName string
expectNeedUpdate bool
expectStatus v1.LoadBalancerStatus
}{
{
"no-cache",
&cachedService{
lastState: &v1.Service{},
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo",
true,
buildServiceStatus([][]string{{"ip1", ""}}),
},
{
"same-ingress",
&cachedService{
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}},
},
},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1",
false,
buildServiceStatus([][]string{{"ip1", ""}}),
},
{
"diff-cluster",
&cachedService{
lastState: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "bar1"},
},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo2": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}},
},
},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{{"ip1", ""}}),
},
{
"diff-ingress",
&cachedService{
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}),
},
},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}),
},
}
for _, test := range tests {
result := cc.processServiceUpdate(test.cachedService, test.service, test.clusterName)
if test.expectNeedUpdate != result {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result)
}
if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer)
}
}
}
func TestProcessServiceDeletion(t *testing.T) {
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
}
tests := []struct {
name string
cachedService *cachedService
service *v1.Service
clusterName string
expectNeedUpdate bool
expectStatus v1.LoadBalancerStatus
}{
{
"same-ingress",
&cachedService{
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}},
},
},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{}),
},
{
"diff-ingress",
&cachedService{
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}),
"foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
},
},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
},
}
for _, test := range tests {
result := cc.processServiceDeletion(test.cachedService, test.clusterName)
if test.expectNeedUpdate != result {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result)
}
if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) {
t.Errorf("Test failed for %s, expected %+v, saw %+v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer)
}
}
}

View File

@ -18,23 +18,19 @@ package service
import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"
"reflect"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
@ -59,27 +55,9 @@ import (
const (
serviceSyncPeriod = 10 * time.Minute
clusterSyncPeriod = 10 * time.Minute
// How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
// should be changed appropriately.
minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second
// client retry count and interval is when accessing a remote kube-apiserver or federation apiserver
// how many times should be attempted and how long it should sleep when failure occurs
// the retry should be in short time so no exponential backoff
clientRetryCount = 5
retryable = true
doNotRetry = time.Duration(0)
UserAgentName = "federation-service-controller"
maxNoOfClusters = 100
reviewDelay = 10 * time.Second
updateTimeout = 30 * time.Second
allClustersKey = "ALL_CLUSTERS"
@ -91,33 +69,6 @@ var (
RequiredResources = []schema.GroupVersionResource{v1.SchemeGroupVersion.WithResource("services")}
)
type cachedService struct {
lastState *v1.Service
// The state as successfully applied to the DNS server
appliedState *v1.Service
// cluster endpoint map hold subset info from kubernetes clusters
// key clusterName
// value is a flag that if there is ready address, 1 means there is ready address
endpointMap map[string]int
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
serviceStatusMap map[string]v1.LoadBalancerStatus
// Ensures only one goroutine can operate on this service at any given time.
rwlock sync.Mutex
// Controls error back-off for proceeding federation service to k8s clusters
lastRetryDelay time.Duration
// Controls error back-off for updating federation service back to federation apiserver
lastFedUpdateDelay time.Duration
// Controls error back-off for dns record update
lastDNSUpdateDelay time.Duration
}
type serviceCache struct {
rwlock sync.Mutex // protects serviceMap
// federation service map contains all service received from federation apiserver
// key serviceName
fedServiceMap map[string]*cachedService
}
type ServiceController struct {
dns dnsprovider.Interface
federationClient fedclientset.Interface
@ -128,9 +79,7 @@ type ServiceController struct {
zoneName string
zoneID string
// each federation should be configured with a single zone (e.g. "mycompany.com")
dnsZones dnsprovider.Zones
serviceCache *serviceCache
clusterCache *clusterClientCache
dnsZones dnsprovider.Zones
// A store of services, populated by the serviceController
serviceStore corelisters.ServiceLister
// Watches changes to all services
@ -143,18 +92,7 @@ type ServiceController struct {
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
// services that need to be synced
queue *workqueue.Type
knownClusterSet sets.String
// endpoint worker map contains all the clusters registered with an indication that worker exist
// key clusterName
endpointWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
endpointWorkerDoneChan chan string
// service worker map contains all the clusters registered with an indication that worker exist
// key clusterName
serviceWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
serviceWorkerDoneChan chan string
queue *workqueue.Type
// For triggering all services reconciliation. This is used when
// a new cluster becomes available.
@ -174,7 +112,6 @@ type ServiceController struct {
// New returns a new service controller to keep DNS provider service resources
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
broadcaster := record.NewBroadcaster()
@ -183,21 +120,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
s := &ServiceController{
dns: dns,
federationClient: federationClient,
federationName: federationName,
serviceDnsSuffix: serviceDnsSuffix,
zoneName: zoneName,
zoneID: zoneID,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
rwlock: sync.Mutex{},
clientMap: make(map[string]*clusterCache),
},
dns: dns,
federationClient: federationClient,
federationName: federationName,
serviceDnsSuffix: serviceDnsSuffix,
zoneName: zoneName,
zoneID: zoneID,
eventBroadcaster: broadcaster,
eventRecorder: recorder,
queue: workqueue.New(),
knownClusterSet: make(sets.String),
reviewDelay: reviewDelay,
clusterAvailableDelay: clusterAvailableDelay,
updateTimeout: updateTimeout,
@ -317,10 +248,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
s.federatedUpdater,
)
s.endpointWorkerMap = make(map[string]bool)
s.serviceWorkerMap = make(map[string]bool)
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
return s
}
@ -331,16 +258,6 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj
return s.federationClient.Core().Services(service.Namespace).Update(service)
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (s *ServiceController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
s.queue.Add(key)
}
// Run starts a background goroutine that watches for changes to federation services
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
// federationSyncPeriod controls how often we check the federation's services to
@ -472,579 +389,6 @@ func wantsDNSRecords(service *v1.Service) bool {
return service.Spec.Type == v1.ServiceTypeLoadBalancer
}
// processServiceForCluster creates or updates service to all registered running clusters,
// update DNS records and update the service info with DNS entries to federation apiserver.
// the function returns any error caught
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
if service.DeletionTimestamp != nil {
glog.V(4).Infof("Service has already been marked for deletion %v", service.Name)
return nil
}
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
// Create or Update k8s Service
err := s.ensureClusterService(cachedService, clusterName, service, client)
if err != nil {
glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
return err
}
glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
return nil
}
// updateFederationService Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) {
// Clone federation service, and create them in underlying k8s cluster
desiredService := &v1.Service{
ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta),
Spec: *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)),
}
// handle available clusters one by one
hasErr := false
var wg sync.WaitGroup
for clusterName, cache := range s.clusterCache.clientMap {
wg.Add(1)
go func(cache *clusterCache, clusterName string) {
defer wg.Done()
err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
if err != nil {
hasErr = true
}
}(cache, clusterName)
}
wg.Wait()
if hasErr {
// detail error has been dumped inside the loop
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable
}
return nil, !retryable
}
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
var err error
var needUpdate bool
for i := 0; i < clientRetryCount; i++ {
svc, err := client.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
if err == nil {
// service exists
glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
//reserve immutable fields
service.Spec.ClusterIP = svc.Spec.ClusterIP
//reserve auto assigned field
for i, oldPort := range svc.Spec.Ports {
for _, port := range service.Spec.Ports {
if port.NodePort == 0 {
if !portEqualExcludeNodePort(&oldPort, &port) {
svc.Spec.Ports[i] = port
needUpdate = true
}
} else {
if !portEqualForLB(&oldPort, &port) {
svc.Spec.Ports[i] = port
needUpdate = true
}
}
}
}
if needUpdate {
// we only apply spec update
svc.Spec = service.Spec
_, err = client.Core().Services(svc.Namespace).Update(svc)
if err == nil {
glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName)
return nil
} else {
glog.V(4).Infof("Failed to update %+v", err)
}
} else {
glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName)
return nil
}
} else if errors.IsNotFound(err) {
// Create service if it is not found
glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new",
service.Namespace, service.Name, clusterName)
service.ResourceVersion = ""
_, err = client.Core().Services(service.Namespace).Create(service)
if err == nil {
glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName)
return nil
}
glog.V(4).Infof("Failed to create %+v", err)
if errors.IsAlreadyExists(err) {
glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName)
return nil
}
}
if errors.IsConflict(err) {
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
service.Namespace, service.Name, err)
}
// should we reuse same retry delay for all clusters?
time.Sleep(cachedService.nextRetryDelay())
}
return err
}
func (s *serviceCache) allServices() []*cachedService {
s.rwlock.Lock()
defer s.rwlock.Unlock()
services := make([]*cachedService, 0, len(s.fedServiceMap))
for _, v := range s.fedServiceMap {
services = append(services, v)
}
return services
}
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
s.rwlock.Lock()
defer s.rwlock.Unlock()
service, ok := s.fedServiceMap[serviceName]
return service, ok
}
func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
s.rwlock.Lock()
defer s.rwlock.Unlock()
service, ok := s.fedServiceMap[serviceName]
if !ok {
service = &cachedService{
endpointMap: make(map[string]int),
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
}
s.fedServiceMap[serviceName] = service
}
return service
}
func (s *serviceCache) set(serviceName string, service *cachedService) {
s.rwlock.Lock()
defer s.rwlock.Unlock()
s.fedServiceMap[serviceName] = service
}
func (s *serviceCache) delete(serviceName string) {
s.rwlock.Lock()
defer s.rwlock.Unlock()
delete(s.fedServiceMap, serviceName)
}
// needsUpdateDNS check if the dns records of the given service should be updated
func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool {
if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
return false
}
if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
oldService.Spec.Type, newService.Spec.Type)
return true
}
if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if !LoadBalancerIPsAreEqual(oldService, newService) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
return true
}
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
return true
}
for i := range oldService.Spec.ExternalIPs {
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
newService.Spec.ExternalIPs[i])
return true
}
}
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
return true
}
if oldService.UID != newService.UID {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
oldService.UID, newService.UID)
return true
}
return false
}
func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
// TODO: quinton: Probably applies for DNS SVC records. Come back to this.
//var protocol api.Protocol
ports := []*v1.ServicePort{}
for i := range service.Spec.Ports {
sp := &service.Spec.Ports[i]
// The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation
ports = append(ports, sp)
}
return ports, nil
}
func portsEqualForLB(x, y *v1.Service) bool {
xPorts, err := getPortsForLB(x)
if err != nil {
return false
}
yPorts, err := getPortsForLB(y)
if err != nil {
return false
}
return portSlicesEqualForLB(xPorts, yPorts)
}
func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
if len(x) != len(y) {
return false
}
for i := range x {
if !portEqualForLB(x[i], y[i]) {
return false
}
}
return true
}
func portEqualForLB(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name {
return false
}
if x.Protocol != y.Protocol {
return false
}
if x.Port != y.Port {
return false
}
if x.NodePort != y.NodePort {
return false
}
return true
}
func portEqualExcludeNodePort(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name {
return false
}
if x.Protocol != y.Protocol {
return false
}
if x.Port != y.Port {
return false
}
return true
}
func clustersFromList(list *v1beta1.ClusterList) []string {
result := []string{}
for ix := range list.Items {
result = append(result, list.Items[ix].Name)
}
return result
}
// getClusterConditionPredicate filter all clusters meet condition of
// condition.type=Ready and condition.status=true
func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
return func(cluster v1beta1.Cluster) bool {
// If we have no info, don't accept
if len(cluster.Status.Conditions) == 0 {
return false
}
for _, cond := range cluster.Status.Conditions {
//We consider the cluster for load balancing only when its ClusterReady condition status
//is ConditionTrue
if cond.Type == v1beta1.ClusterReady && cond.Status != v1.ConditionTrue {
glog.V(4).Infof("Ignoring cluster %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
return false
}
}
return true
}
}
// clusterSyncLoop observes running clusters changes, and apply all services to new added cluster
// and add dns records for the changes
func (s *ServiceController) clusterSyncLoop() {
var servicesToUpdate []*cachedService
// should we remove cache for cluster from ready to not ready? should remove the condition predicate if no
clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List()
if err != nil {
glog.Infof("Fail to get cluster list")
return
}
newClusters := clustersFromList(&clusters)
var newSet, increase sets.String
newSet = sets.NewString(newClusters...)
if newSet.Equal(s.knownClusterSet) {
// The set of cluster names in the services in the federation hasn't changed, but we can retry
// updating any services that we failed to update last time around.
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
return
}
glog.Infof("Detected change in list of cluster names. New set: %v, Old set: %v", newSet, s.knownClusterSet)
increase = newSet.Difference(s.knownClusterSet)
// do nothing when cluster is removed.
if increase != nil {
// Try updating all services, and save the ones that fail to try again next
// round.
servicesToUpdate = s.serviceCache.allServices()
numServices := len(servicesToUpdate)
for newCluster := range increase {
glog.Infof("New cluster observed %s", newCluster)
s.updateAllServicesToCluster(servicesToUpdate, newCluster)
}
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster",
numServices-len(servicesToUpdate), numServices)
}
s.knownClusterSet = newSet
}
func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) {
cluster, ok := s.clusterCache.clientMap[clusterName]
if ok {
for _, cachedService := range services {
appliedState := cachedService.lastState
s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset)
}
}
}
// updateDNSRecords updates all existing federation service DNS Records so that
// they will match the list of cluster names provided.
// Returns the list of services that couldn't be updated.
func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) {
for _, service := range services {
func() {
service.rwlock.Lock()
defer service.rwlock.Unlock()
// If the applied state is nil, that means it hasn't yet been successfully dealt
// with by the DNS Record reconciler. We can trust the DNS Record
// reconciler to ensure the federation service's DNS records are created to target
// the correct backend service IP's
if service.appliedState == nil {
return
}
if err := s.lockedUpdateDNSRecords(service, clusters); err != nil {
glog.Errorf("External error while updating DNS Records: %v.", err)
servicesToRetry = append(servicesToRetry, service)
}
}()
}
return servicesToRetry
}
// lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex
// associated with the service.
func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error {
if !wantsDNSRecords(service.appliedState) {
return nil
}
ensuredCount := 0
unensuredCount := 0
for key := range s.clusterCache.clientMap {
for _, clusterName := range clusterNames {
if key == clusterName {
err := s.ensureDnsRecords(clusterName, service.lastState)
if err != nil {
unensuredCount += 1
glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err)
} else {
ensuredCount += 1
}
}
}
}
missedCount := len(clusterNames) - ensuredCount - unensuredCount
if missedCount > 0 || unensuredCount > 0 {
return fmt.Errorf("Failed to update DNS records for %d clusters for service %v due to missing clients [missed count: %d] and/or failing to ensure DNS records [unensured count: %d]",
len(clusterNames), service, missedCount, unensuredCount)
}
return nil
}
func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextRetryDelay() time.Duration {
s.lastRetryDelay = s.lastRetryDelay * 2
if s.lastRetryDelay < minRetryDelay {
s.lastRetryDelay = minRetryDelay
}
if s.lastRetryDelay > maxRetryDelay {
s.lastRetryDelay = maxRetryDelay
}
return s.lastRetryDelay
}
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}
// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextFedUpdateDelay() time.Duration {
s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2
if s.lastFedUpdateDelay < minRetryDelay {
s.lastFedUpdateDelay = minRetryDelay
}
if s.lastFedUpdateDelay > maxRetryDelay {
s.lastFedUpdateDelay = maxRetryDelay
}
return s.lastFedUpdateDelay
}
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetFedUpdateDelay() {
s.lastFedUpdateDelay = time.Duration(0)
}
// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextDNSUpdateDelay() time.Duration {
s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2
if s.lastDNSUpdateDelay < minRetryDelay {
s.lastDNSUpdateDelay = minRetryDelay
}
if s.lastDNSUpdateDelay > maxRetryDelay {
s.lastDNSUpdateDelay = maxRetryDelay
}
return s.lastDNSUpdateDelay
}
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetDNSUpdateDelay() {
s.lastDNSUpdateDelay = time.Duration(0)
}
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
var retryDelay time.Duration
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)
return err
}
service, err := s.serviceStore.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
case err != nil:
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)
return err
default:
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
copy, err := conversion.NewCloner().DeepCopy(service)
if err != nil {
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
s.queue.Add(key)
return err
}
service := copy.(*v1.Service)
cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
}
if retryDelay != 0 {
s.enqueueService(service)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
}
return nil
}
// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
// Ensure that no other goroutine will interfere with our processing of the
// service.
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
if service.DeletionTimestamp != nil {
if err := s.delete(service); err != nil {
glog.Errorf("Failed to delete %s: %v", service, err)
s.eventRecorder.Eventf(service, api.EventTypeWarning, "DeleteFailed",
"Service delete failed: %v", err)
return err, cachedService.nextRetryDelay()
}
return nil, doNotRetry
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s",
service.Name)
// Add the required finalizers before creating a service in underlying clusters.
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v",
service.Name, err)
return err, cachedService.nextRetryDelay()
}
service = updatedServiceObj.(*v1.Service)
glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name)
// Update the cached service (used above for populating synthetic deletes)
// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver
// if the same service is changed before this go routine finished, there will be another queue entry to handle that.
cachedService.lastState = service
err, retry := s.updateFederationService(key, cachedService)
if err != nil {
message := "Error occurs when updating service to all clusters"
if retry {
message += " (will retry): "
} else {
message += " (will not retry): "
}
message += err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message)
return err, cachedService.nextRetryDelay()
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
// processed it, a cached service being nil implies that it hasn't yet
// been successfully processed.
cachedService.appliedState = service
s.serviceCache.set(key, cachedService)
glog.V(4).Infof("Successfully proceeded services %s", key)
cachedService.resetRetryDelay()
return nil, doNotRetry
}
// delete deletes the given service or returns error if the deletion was not complete.
func (s *ServiceController) delete(service *v1.Service) error {
glog.V(3).Infof("Handling deletion of service: %v", *service)
@ -1083,15 +427,6 @@ func (s *ServiceController) delete(service *v1.Service) error {
return nil
}
// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
glog.V(2).Infof("Process service deletion for %v", key)
s.serviceCache.delete(key)
return nil, doNotRetry
}
func (s *ServiceController) deliverServicesOnClusterChange() {
if !s.isSynced() {
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay)

View File

@ -20,7 +20,6 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -29,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
@ -43,64 +41,6 @@ import (
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
func TestGetClusterConditionPredicate(t *testing.T) {
fakedns, _ := clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
serviceController := ServiceController{
dns: fakedns,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
rwlock: sync.Mutex{},
clientMap: make(map[string]*clusterCache),
},
knownClusterSet: make(sets.String),
}
tests := []struct {
cluster v1beta1.Cluster
expectAccept bool
name string
serviceController *ServiceController
}{
{
cluster: v1beta1.Cluster{},
expectAccept: false,
name: "empty",
serviceController: &serviceController,
},
{
cluster: v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
Conditions: []v1beta1.ClusterCondition{
{Type: v1beta1.ClusterReady, Status: v1.ConditionTrue},
},
},
},
expectAccept: true,
name: "basic",
serviceController: &serviceController,
},
{
cluster: v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
Conditions: []v1beta1.ClusterCondition{
{Type: v1beta1.ClusterReady, Status: v1.ConditionFalse},
},
},
},
expectAccept: false,
name: "notready",
serviceController: &serviceController,
},
}
pred := getClusterConditionPredicate()
for _, test := range tests {
accept := pred(test.cluster)
if accept != test.expectAccept {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept)
}
}
}
const (
retryInterval = 100 * time.Millisecond