mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #26034 from mfanjie/federation-service-controller
Federation service controller
This commit is contained in:
commit
eff728a46d
@ -51,6 +51,7 @@ federation-controller-manager
|
||||
```
|
||||
--address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces)
|
||||
--cluster-monitor-period=40s: The period for syncing ClusterStatus in ClusterController.
|
||||
--concurrent-service-syncs=10: The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
|
||||
--federated-api-burst=30: Burst to use while talking with federation apiserver
|
||||
--federated-api-qps=20: QPS to use while talking with federation apiserver
|
||||
--kube-api-content-type="": ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now.
|
||||
@ -65,7 +66,7 @@ federation-controller-manager
|
||||
--profiling[=true]: Enable profiling via web interface host:port/debug/pprof/
|
||||
```
|
||||
|
||||
###### Auto generated by spf13/cobra on 25-May-2016
|
||||
###### Auto generated by spf13/cobra on 29-May-2016
|
||||
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||
|
35
federation/client/cache/cluster_cache.go
vendored
35
federation/client/cache/cluster_cache.go
vendored
@ -17,7 +17,8 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/federation/apis/federation"
|
||||
kubeCache "k8s.io/kubernetes/pkg/client/cache"
|
||||
)
|
||||
|
||||
@ -27,9 +28,37 @@ type StoreToClusterLister struct {
|
||||
kubeCache.Store
|
||||
}
|
||||
|
||||
func (s *StoreToClusterLister) List() (clusters federation_v1alpha1.ClusterList, err error) {
|
||||
func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
clusters.Items = append(clusters.Items, *(m.(*federation_v1alpha1.Cluster)))
|
||||
clusters.Items = append(clusters.Items, *(m.(*federation.Cluster)))
|
||||
}
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
// ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet
|
||||
// some set of criteria defined by the function.
|
||||
type ClusterConditionPredicate func(cluster federation.Cluster) bool
|
||||
|
||||
// storeToClusterConditionLister filters and returns nodes matching the given type and status from the store.
|
||||
type storeToClusterConditionLister struct {
|
||||
store kubeCache.Store
|
||||
predicate ClusterConditionPredicate
|
||||
}
|
||||
|
||||
// ClusterCondition returns a storeToClusterConditionLister
|
||||
func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredicate) storeToClusterConditionLister {
|
||||
return storeToClusterConditionLister{s.Store, predicate}
|
||||
}
|
||||
|
||||
// List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister.
|
||||
func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) {
|
||||
for _, m := range s.store.List() {
|
||||
cluster := *m.(*federation.Cluster)
|
||||
if s.predicate(cluster) {
|
||||
clusters.Items = append(clusters.Items, cluster)
|
||||
} else {
|
||||
glog.V(5).Infof("Cluster %s matches none of the conditions", cluster.Name)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -28,11 +28,15 @@ import (
|
||||
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
|
||||
internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
|
||||
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -103,7 +107,17 @@ func Run(s *options.CMServer) error {
|
||||
}
|
||||
|
||||
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
|
||||
|
||||
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
||||
go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run()
|
||||
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
|
||||
if err != nil {
|
||||
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||
}
|
||||
scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
|
||||
servicecontroller := servicecontroller.New(scclientset, dns)
|
||||
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
}
|
||||
select {}
|
||||
}
|
||||
|
@ -32,6 +32,14 @@ type ControllerManagerConfiguration struct {
|
||||
Port int `json:"port"`
|
||||
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
|
||||
Address string `json:"address"`
|
||||
// dnsProvider is the provider for dns services.
|
||||
DnsProvider string `json:"dnsProvider"`
|
||||
// dnsConfigFile is the path to the dns provider configuration file.
|
||||
DnsConfigFile string `json:"ndsConfigFile"`
|
||||
// concurrentServiceSyncs is the number of services that are
|
||||
// allowed to sync concurrently. Larger number = more responsive service
|
||||
// management, but more CPU (and network) load.
|
||||
ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"`
|
||||
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
|
||||
ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"`
|
||||
// APIServerQPS is the QPS to use while talking with federation apiserver.
|
||||
@ -63,12 +71,13 @@ const (
|
||||
func NewCMServer() *CMServer {
|
||||
s := CMServer{
|
||||
ControllerManagerConfiguration: ControllerManagerConfiguration{
|
||||
Port: FederatedControllerManagerPort,
|
||||
Address: "0.0.0.0",
|
||||
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
|
||||
APIServerQPS: 20.0,
|
||||
APIServerBurst: 30,
|
||||
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
|
||||
Port: FederatedControllerManagerPort,
|
||||
Address: "0.0.0.0",
|
||||
ConcurrentServiceSyncs: 10,
|
||||
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
|
||||
APIServerQPS: 20.0,
|
||||
APIServerBurst: 30,
|
||||
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
|
||||
},
|
||||
}
|
||||
return &s
|
||||
@ -78,6 +87,7 @@ func NewCMServer() *CMServer {
|
||||
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
|
||||
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
||||
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
|
||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)")
|
||||
|
207
federation/pkg/federation-controller/service/cluster_helper.go
Normal file
207
federation/pkg/federation-controller/service/cluster_helper.go
Normal file
@ -0,0 +1,207 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/federation/apis/federation"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
cache "k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type clusterCache struct {
|
||||
clientset *clientset.Clientset
|
||||
cluster *federation.Cluster
|
||||
// A store of services, populated by the serviceController
|
||||
serviceStore cache.StoreToServiceLister
|
||||
// Watches changes to all services
|
||||
serviceController *framework.Controller
|
||||
// A store of endpoint, populated by the serviceController
|
||||
endpointStore cache.StoreToEndpointsLister
|
||||
// Watches changes to all endpoints
|
||||
endpointController *framework.Controller
|
||||
// services that need to be synced
|
||||
serviceQueue *workqueue.Type
|
||||
// endpoints that need to be synced
|
||||
endpointQueue *workqueue.Type
|
||||
}
|
||||
|
||||
type clusterClientCache struct {
|
||||
rwlock sync.Mutex // protects serviceMap
|
||||
clientMap map[string]*clusterCache
|
||||
}
|
||||
|
||||
func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) {
|
||||
cachedClusterClient, ok := cc.clientMap[clusterName]
|
||||
// only create when no existing cachedClusterClient
|
||||
if ok {
|
||||
if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) {
|
||||
//rebuild clientset when cluster spec is changed
|
||||
clientset, err := newClusterClientset(cluster)
|
||||
if err != nil || clientset == nil {
|
||||
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
|
||||
}
|
||||
glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName)
|
||||
cachedClusterClient.clientset = clientset
|
||||
go cachedClusterClient.serviceController.Run(wait.NeverStop)
|
||||
go cachedClusterClient.endpointController.Run(wait.NeverStop)
|
||||
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
|
||||
} else {
|
||||
// do nothing when there is no spec change
|
||||
glog.V(4).Infof("Keep clientset for cluster %s", clusterName)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
glog.V(4).Infof("No client cache for cluster %s, building new", clusterName)
|
||||
clientset, err := newClusterClientset(cluster)
|
||||
if err != nil || clientset == nil {
|
||||
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
|
||||
}
|
||||
cachedClusterClient = &clusterCache{
|
||||
cluster: cluster,
|
||||
clientset: clientset,
|
||||
serviceQueue: workqueue.New(),
|
||||
endpointQueue: workqueue.New(),
|
||||
}
|
||||
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return clientset.Core().Endpoints(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return clientset.Core().Endpoints(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Endpoints{},
|
||||
serviceSyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
cc.enqueueEndpoint(obj, clusterName)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
cc.enqueueEndpoint(cur, clusterName)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
cc.enqueueEndpoint(obj, clusterName)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return clientset.Core().Services(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return clientset.Core().Services(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Service{},
|
||||
serviceSyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
cc.enqueueService(obj, clusterName)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldService, ok := old.(*api.Service)
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
curService, ok := cur.(*api.Service)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) {
|
||||
cc.enqueueService(cur, clusterName)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
service, _ := obj.(*api.Service)
|
||||
cc.enqueueService(obj, clusterName)
|
||||
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
|
||||
},
|
||||
},
|
||||
)
|
||||
cc.clientMap[clusterName] = cachedClusterClient
|
||||
go cachedClusterClient.serviceController.Run(wait.NeverStop)
|
||||
go cachedClusterClient.endpointController.Run(wait.NeverStop)
|
||||
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//TODO: copied from cluster controller, to make this as common function in pass 2
|
||||
// delFromClusterSet delete a cluster from clusterSet and
|
||||
// delete the corresponding restclient from the map clusterKubeClientMap
|
||||
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
|
||||
cluster, ok := obj.(*federation.Cluster)
|
||||
cc.rwlock.Lock()
|
||||
defer cc.rwlock.Unlock()
|
||||
if ok {
|
||||
delete(cc.clientMap, cluster.Name)
|
||||
} else {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj)
|
||||
return
|
||||
}
|
||||
glog.Infof("Found tombstone for %v", obj)
|
||||
delete(cc.clientMap, tombstone.Key)
|
||||
}
|
||||
}
|
||||
|
||||
// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
|
||||
// restclient to map clusterKubeClientMap
|
||||
func (cc *clusterClientCache) addToClientMap(obj interface{}) {
|
||||
cluster := obj.(*federation.Cluster)
|
||||
cc.rwlock.Lock()
|
||||
defer cc.rwlock.Unlock()
|
||||
cluster, ok := obj.(*federation.Cluster)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pred := getClusterConditionPredicate()
|
||||
// check status
|
||||
// skip if not ready
|
||||
if pred(*cluster) {
|
||||
cc.startClusterLW(cluster, cluster.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) {
|
||||
clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusterConfig.QPS = KubeAPIQPS
|
||||
clusterConfig.Burst = KubeAPIBurst
|
||||
clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
|
||||
return clientset, nil
|
||||
}
|
40
federation/pkg/federation-controller/service/dns.go
Normal file
40
federation/pkg/federation-controller/service/dns.go
Normal file
@ -0,0 +1,40 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
// getClusterZoneName returns the name of the zone where the specified cluster exists (e.g. "us-east1-c" on GCE, or "us-east-1b" on AWS)
|
||||
func getClusterZoneName(clusterName string) string {
|
||||
// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet.
|
||||
return "zone-of-cluster-" + clusterName
|
||||
}
|
||||
|
||||
// getClusterRegionName returns the name of the region where the specified cluster exists (e.g. us-east1 on GCE, or "us-east-1" on AWS)
|
||||
func getClusterRegionName(clusterName string) string {
|
||||
// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet.
|
||||
return "region-of-cluster-" + clusterName
|
||||
}
|
||||
|
||||
// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation
|
||||
func getFederationDNSZoneName() string {
|
||||
return "mydomain.com" // TODO: quinton: Get this from the federation configuration.
|
||||
}
|
||||
|
||||
func ensureDNSRecords(clusterName string, cachedService *cachedService) error {
|
||||
// Quinton: Pseudocode....
|
||||
|
||||
return nil
|
||||
}
|
19
federation/pkg/federation-controller/service/doc.go
Normal file
19
federation/pkg/federation-controller/service/doc.go
Normal file
@ -0,0 +1,19 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package service contains code for syncing Kubernetes services,
|
||||
// and cloud DNS servers with the federated service registry.
|
||||
package service
|
162
federation/pkg/federation-controller/service/endpoint_helper.go
Normal file
162
federation/pkg/federation-controller/service/endpoint_helper.go
Normal file
@ -0,0 +1,162 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
cache "k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (sc *ServiceController) clusterEndpointWorker() {
|
||||
fedClient := sc.federationClient
|
||||
for clusterName, cache := range sc.clusterCache.clientMap {
|
||||
go func(cache *clusterCache, clusterName string) {
|
||||
for {
|
||||
func() {
|
||||
key, quit := cache.endpointQueue.Get()
|
||||
// update endpoint cache
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer cache.endpointQueue.Done(key)
|
||||
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Failed to sync endpoint: %+v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}(cache, clusterName)
|
||||
}
|
||||
}
|
||||
|
||||
// Whenever there is change on endpoint, the federation service should be updated
|
||||
// key is the namespaced name of endpoint
|
||||
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error {
|
||||
cachedService, ok := serviceCache.get(key)
|
||||
if !ok {
|
||||
// here we filtered all non-federation services
|
||||
return nil
|
||||
}
|
||||
endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err)
|
||||
clusterCache.endpointQueue.Add(key)
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
endpoint, ok := endpointInterface.(*api.Endpoints)
|
||||
if ok {
|
||||
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
|
||||
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName)
|
||||
} else {
|
||||
_, ok := endpointInterface.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface)
|
||||
}
|
||||
glog.Infof("Found tombstone for %v", key)
|
||||
err = cc.processEndpointDeletion(cachedService, clusterName)
|
||||
}
|
||||
} else {
|
||||
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
||||
glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
|
||||
err = cc.processEndpointDeletion(cachedService, clusterName)
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
|
||||
clusterCache.endpointQueue.Add(key)
|
||||
}
|
||||
cachedService.resetDNSUpdateDelay()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string) error {
|
||||
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
||||
var err error
|
||||
cachedService.rwlock.Lock()
|
||||
defer cachedService.rwlock.Unlock()
|
||||
_, ok := cachedService.endpointMap[clusterName]
|
||||
// TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist
|
||||
// need to query dns info from dnsprovider and make sure of if deletion is needed
|
||||
if ok {
|
||||
// endpoints lost, clean dns record
|
||||
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
||||
// TODO: need to integrate with dns.go:ensureDNSRecords
|
||||
for i := 0; i < clientRetryCount; i++ {
|
||||
err := ensureDNSRecords(clusterName, cachedService)
|
||||
if err == nil {
|
||||
delete(cachedService.endpointMap, clusterName)
|
||||
return nil
|
||||
}
|
||||
time.Sleep(cachedService.nextDNSUpdateDelay())
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Update dns info when endpoint update event received
|
||||
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
|
||||
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string) error {
|
||||
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
|
||||
cachedService.rwlock.Lock()
|
||||
defer cachedService.rwlock.Unlock()
|
||||
for _, subset := range endpoint.Subsets {
|
||||
if len(subset.Addresses) > 0 {
|
||||
cachedService.endpointMap[clusterName] = 1
|
||||
}
|
||||
}
|
||||
_, ok := cachedService.endpointMap[clusterName]
|
||||
if !ok {
|
||||
// first time get endpoints, update dns record
|
||||
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName)
|
||||
cachedService.endpointMap[clusterName] = 1
|
||||
err := ensureDNSRecords(clusterName, cachedService)
|
||||
if err != nil {
|
||||
// TODO: need to integrate with dns.go:ensureDNSRecords
|
||||
for i := 0; i < clientRetryCount; i++ {
|
||||
time.Sleep(cachedService.nextDNSUpdateDelay())
|
||||
err := ensureDNSRecords(clusterName, cachedService)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item.
|
||||
func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
_, ok := cc.clientMap[clusterName]
|
||||
if ok {
|
||||
cc.clientMap[clusterName].endpointQueue.Add(key)
|
||||
}
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
func buildEndpoint(subsets [][]string) *api.Endpoints {
|
||||
endpoint := &api.Endpoints{
|
||||
Subsets: []api.EndpointSubset{
|
||||
{Addresses: []api.EndpointAddress{}},
|
||||
},
|
||||
}
|
||||
for _, element := range subsets {
|
||||
address := api.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil}
|
||||
endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address)
|
||||
}
|
||||
return endpoint
|
||||
}
|
||||
|
||||
func TestProcessEndpointUpdate(t *testing.T) {
|
||||
cc := clusterClientCache{
|
||||
clientMap: make(map[string]*clusterCache),
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
cachedService *cachedService
|
||||
endpoint *api.Endpoints
|
||||
clusterName string
|
||||
expectResult int
|
||||
}{
|
||||
{
|
||||
"no-cache",
|
||||
&cachedService{
|
||||
lastState: &api.Service{},
|
||||
endpointMap: make(map[string]int),
|
||||
},
|
||||
buildEndpoint([][]string{{"ip1", ""}}),
|
||||
"foo",
|
||||
1,
|
||||
},
|
||||
{
|
||||
"has-cache",
|
||||
&cachedService{
|
||||
lastState: &api.Service{},
|
||||
endpointMap: map[string]int{
|
||||
"foo": 1,
|
||||
},
|
||||
},
|
||||
buildEndpoint([][]string{{"ip1", ""}}),
|
||||
"foo",
|
||||
1,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName)
|
||||
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessEndpointDeletion(t *testing.T) {
|
||||
cc := clusterClientCache{
|
||||
clientMap: make(map[string]*clusterCache),
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
cachedService *cachedService
|
||||
endpoint *api.Endpoints
|
||||
clusterName string
|
||||
expectResult int
|
||||
}{
|
||||
{
|
||||
"no-cache",
|
||||
&cachedService{
|
||||
lastState: &api.Service{},
|
||||
endpointMap: make(map[string]int),
|
||||
},
|
||||
buildEndpoint([][]string{{"ip1", ""}}),
|
||||
"foo",
|
||||
0,
|
||||
},
|
||||
{
|
||||
"has-cache",
|
||||
&cachedService{
|
||||
lastState: &api.Service{},
|
||||
endpointMap: map[string]int{
|
||||
"foo": 1,
|
||||
},
|
||||
},
|
||||
buildEndpoint([][]string{{"ip1", ""}}),
|
||||
"foo",
|
||||
0,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
cc.processEndpointDeletion(test.cachedService, test.clusterName)
|
||||
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName])
|
||||
}
|
||||
}
|
||||
}
|
253
federation/pkg/federation-controller/service/service_helper.go
Normal file
253
federation/pkg/federation-controller/service/service_helper.go
Normal file
@ -0,0 +1,253 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
cache "k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"reflect"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (sc *ServiceController) clusterServiceWorker() {
|
||||
fedClient := sc.federationClient
|
||||
for clusterName, cache := range sc.clusterCache.clientMap {
|
||||
go func(cache *clusterCache, clusterName string) {
|
||||
for {
|
||||
func() {
|
||||
key, quit := cache.serviceQueue.Get()
|
||||
defer cache.serviceQueue.Done(key)
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to sync service: %+v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}(cache, clusterName)
|
||||
}
|
||||
}
|
||||
|
||||
// Whenever there is change on service, the federation service should be updated
|
||||
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error {
|
||||
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
|
||||
cachedService, ok := serviceCache.get(key)
|
||||
if !ok {
|
||||
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
|
||||
return nil
|
||||
}
|
||||
serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err)
|
||||
clusterCache.serviceQueue.Add(key)
|
||||
return err
|
||||
}
|
||||
var needUpdate bool
|
||||
if exists {
|
||||
service, ok := serviceInterface.(*api.Service)
|
||||
if ok {
|
||||
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
|
||||
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
|
||||
} else {
|
||||
_, ok := serviceInterface.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", serviceInterface)
|
||||
}
|
||||
glog.Infof("Found tombstone for %v", key)
|
||||
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
|
||||
}
|
||||
} else {
|
||||
glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName)
|
||||
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
|
||||
}
|
||||
|
||||
if needUpdate {
|
||||
err := cc.persistFedServiceUpdate(cachedService, fedClient)
|
||||
if err == nil {
|
||||
cachedService.appliedState = cachedService.lastState
|
||||
cachedService.resetFedUpdateDelay()
|
||||
} else {
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
|
||||
clusterCache.serviceQueue.Add(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processServiceDeletion is triggered when a service is delete from underlying k8s cluster
|
||||
// the deletion function will wip out the cached ingress info of the service from federation service ingress
|
||||
// the function returns a bool to indicate if actual update happend on federation service cache
|
||||
// and if the federation service cache is updated, the updated info should be post to federation apiserver
|
||||
func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedService, clusterName string) bool {
|
||||
cachedService.rwlock.Lock()
|
||||
defer cachedService.rwlock.Unlock()
|
||||
cachedStatus, ok := cachedService.serviceStatusMap[clusterName]
|
||||
// cached status found, remove ingress info from federation service cache
|
||||
if ok {
|
||||
cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer
|
||||
removeIndexes := []int{}
|
||||
for i, fed := range cachedFedServiceStatus.Ingress {
|
||||
for _, new := range cachedStatus.Ingress {
|
||||
// remove if same ingress record found
|
||||
if new.IP == fed.IP && new.Hostname == fed.Hostname {
|
||||
removeIndexes = append(removeIndexes, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
sort.Ints(removeIndexes)
|
||||
for i := len(removeIndexes) - 1; i >= 0; i-- {
|
||||
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...)
|
||||
glog.V(4).Infof("Remove old ingress %d for service %s/%s", removeIndexes[i], cachedService.lastState.Namespace, cachedService.lastState.Name)
|
||||
}
|
||||
delete(cachedService.serviceStatusMap, clusterName)
|
||||
delete(cachedService.endpointMap, clusterName)
|
||||
cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus
|
||||
return true
|
||||
} else {
|
||||
glog.V(4).Infof("Service removal %s/%s from cluster %s observed.", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// processServiceUpdate Update ingress info when service updated
|
||||
// the function returns a bool to indicate if actual update happend on federation service cache
|
||||
// and if the federation service cache is updated, the updated info should be post to federation apiserver
|
||||
func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *api.Service, clusterName string) bool {
|
||||
glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
|
||||
cachedService.rwlock.Lock()
|
||||
defer cachedService.rwlock.Unlock()
|
||||
var needUpdate bool
|
||||
newServiceLB := service.Status.LoadBalancer
|
||||
cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer
|
||||
if len(newServiceLB.Ingress) == 0 {
|
||||
// not yet get LB IP
|
||||
return false
|
||||
}
|
||||
|
||||
cachedStatus, ok := cachedService.serviceStatusMap[clusterName]
|
||||
if ok {
|
||||
if reflect.DeepEqual(cachedStatus, newServiceLB) {
|
||||
glog.V(4).Infof("Same ingress info observed for service %s/%s: %+v ", service.Namespace, service.Name, cachedStatus.Ingress)
|
||||
} else {
|
||||
glog.V(4).Infof("Ingress info was changed for service %s/%s: cache: %+v, new: %+v ",
|
||||
service.Namespace, service.Name, cachedStatus.Ingress, newServiceLB)
|
||||
needUpdate = true
|
||||
}
|
||||
} else {
|
||||
glog.V(4).Infof("Cached service status was not found for %s/%s, cluster %s, building one", service.Namespace, service.Name, clusterName)
|
||||
|
||||
// cache is not always reliable(cache will be cleaned when service controller restart)
|
||||
// two cases will run into this branch:
|
||||
// 1. new service loadbalancer info received -> no info in cache, and no in federation service
|
||||
// 2. service controller being restarted -> no info in cache, but it is in federation service
|
||||
|
||||
// check if the lb info is already in federation service
|
||||
|
||||
cachedService.serviceStatusMap[clusterName] = newServiceLB
|
||||
needUpdate = false
|
||||
// iterate service ingress info
|
||||
for _, new := range newServiceLB.Ingress {
|
||||
var found bool
|
||||
// if it is known by federation service
|
||||
for _, fed := range cachedFedServiceStatus.Ingress {
|
||||
if new.IP == fed.IP && new.Hostname == fed.Hostname {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
needUpdate = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if needUpdate {
|
||||
// new status = cached federation status - cached status + new status from k8s cluster
|
||||
|
||||
removeIndexes := []int{}
|
||||
for i, fed := range cachedFedServiceStatus.Ingress {
|
||||
for _, new := range cachedStatus.Ingress {
|
||||
// remove if same ingress record found
|
||||
if new.IP == fed.IP && new.Hostname == fed.Hostname {
|
||||
removeIndexes = append(removeIndexes, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
sort.Ints(removeIndexes)
|
||||
for i := len(removeIndexes) - 1; i >= 0; i-- {
|
||||
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...)
|
||||
}
|
||||
cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress, service.Status.LoadBalancer.Ingress...)
|
||||
cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus
|
||||
glog.V(4).Infof("Add new ingress info %+v for service %s/%s", service.Status.LoadBalancer, service.Namespace, service.Name)
|
||||
} else {
|
||||
glog.V(4).Infof("Same ingress info found for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
|
||||
}
|
||||
return needUpdate
|
||||
}
|
||||
|
||||
func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federationclientset.Interface) error {
|
||||
service := cachedService.lastState
|
||||
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
|
||||
var err error
|
||||
for i := 0; i < clientRetryCount; i++ {
|
||||
_, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service)
|
||||
if err == nil {
|
||||
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
|
||||
return nil
|
||||
}
|
||||
if errors.IsNotFound(err) {
|
||||
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
||||
service.Namespace, service.Name, err)
|
||||
return nil
|
||||
}
|
||||
if errors.IsConflict(err) {
|
||||
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
||||
service.Namespace, service.Name, err)
|
||||
return err
|
||||
}
|
||||
time.Sleep(cachedService.nextFedUpdateDelay())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (cc *clusterClientCache) enqueueService(obj interface{}, clusterName string) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
_, ok := cc.clientMap[clusterName]
|
||||
if ok {
|
||||
cc.clientMap[clusterName].serviceQueue.Add(key)
|
||||
}
|
||||
}
|
@ -0,0 +1,162 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
func buildServiceStatus(ingresses [][]string) api.LoadBalancerStatus {
|
||||
status := api.LoadBalancerStatus{
|
||||
Ingress: []api.LoadBalancerIngress{},
|
||||
}
|
||||
for _, element := range ingresses {
|
||||
ingress := api.LoadBalancerIngress{IP: element[0], Hostname: element[1]}
|
||||
status.Ingress = append(status.Ingress, ingress)
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func TestProcessServiceUpdate(t *testing.T) {
|
||||
cc := clusterClientCache{
|
||||
clientMap: make(map[string]*clusterCache),
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
cachedService *cachedService
|
||||
service *api.Service
|
||||
clusterName string
|
||||
expectNeedUpdate bool
|
||||
expectStatus api.LoadBalancerStatus
|
||||
}{
|
||||
{
|
||||
"no-cache",
|
||||
&cachedService{
|
||||
lastState: &api.Service{},
|
||||
serviceStatusMap: make(map[string]api.LoadBalancerStatus),
|
||||
},
|
||||
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
||||
"foo",
|
||||
true,
|
||||
buildServiceStatus([][]string{{"ip1", ""}}),
|
||||
},
|
||||
{
|
||||
"same-ingress",
|
||||
&cachedService{
|
||||
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
||||
serviceStatusMap: map[string]api.LoadBalancerStatus{
|
||||
"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}},
|
||||
},
|
||||
},
|
||||
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
||||
"foo1",
|
||||
false,
|
||||
buildServiceStatus([][]string{{"ip1", ""}}),
|
||||
},
|
||||
{
|
||||
"diff-cluster",
|
||||
&cachedService{
|
||||
lastState: &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "bar1"},
|
||||
},
|
||||
serviceStatusMap: map[string]api.LoadBalancerStatus{
|
||||
"foo2": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}},
|
||||
},
|
||||
},
|
||||
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
||||
"foo1",
|
||||
true,
|
||||
buildServiceStatus([][]string{{"ip1", ""}}),
|
||||
},
|
||||
{
|
||||
"diff-ingress",
|
||||
&cachedService{
|
||||
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}},
|
||||
serviceStatusMap: map[string]api.LoadBalancerStatus{
|
||||
"foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}),
|
||||
},
|
||||
},
|
||||
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}},
|
||||
"foo1",
|
||||
true,
|
||||
buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
result := cc.processServiceUpdate(test.cachedService, test.service, test.clusterName)
|
||||
if test.expectNeedUpdate != result {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessServiceDeletion(t *testing.T) {
|
||||
cc := clusterClientCache{
|
||||
clientMap: make(map[string]*clusterCache),
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
cachedService *cachedService
|
||||
service *api.Service
|
||||
clusterName string
|
||||
expectNeedUpdate bool
|
||||
expectStatus api.LoadBalancerStatus
|
||||
}{
|
||||
{
|
||||
"same-ingress",
|
||||
&cachedService{
|
||||
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
||||
serviceStatusMap: map[string]api.LoadBalancerStatus{
|
||||
"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}},
|
||||
},
|
||||
},
|
||||
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
|
||||
"foo1",
|
||||
true,
|
||||
buildServiceStatus([][]string{}),
|
||||
},
|
||||
{
|
||||
"diff-ingress",
|
||||
&cachedService{
|
||||
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}},
|
||||
serviceStatusMap: map[string]api.LoadBalancerStatus{
|
||||
"foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}),
|
||||
"foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
|
||||
},
|
||||
},
|
||||
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}},
|
||||
"foo1",
|
||||
true,
|
||||
buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
result := cc.processServiceDeletion(test.cachedService, test.clusterName)
|
||||
if test.expectNeedUpdate != result {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) {
|
||||
t.Errorf("Test failed for %s, expected %+v, saw %+v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,874 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
federation "k8s.io/kubernetes/federation/apis/federation"
|
||||
federationcache "k8s.io/kubernetes/federation/client/cache"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
cache "k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO update to 10 mins before merge
|
||||
serviceSyncPeriod = 30 * time.Second
|
||||
clusterSyncPeriod = 100 * time.Second
|
||||
|
||||
// How long to wait before retrying the processing of a service change.
|
||||
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
|
||||
// should be changed appropriately.
|
||||
minRetryDelay = 5 * time.Second
|
||||
maxRetryDelay = 300 * time.Second
|
||||
|
||||
// client retry count and interval is when accessing a remote kube-apiserver or federation apiserver
|
||||
// how many times should be attempted and how long it should sleep when failure occurs
|
||||
// the retry should be in short time so no exponential backoff
|
||||
clientRetryCount = 5
|
||||
|
||||
retryable = true
|
||||
|
||||
doNotRetry = time.Duration(0)
|
||||
|
||||
UserAgentName = "federation-service-controller"
|
||||
KubeAPIQPS = 20.0
|
||||
KubeAPIBurst = 30
|
||||
)
|
||||
|
||||
type cachedService struct {
|
||||
lastState *api.Service
|
||||
// The state as successfully applied to the DNS server
|
||||
appliedState *api.Service
|
||||
// cluster endpoint map hold subset info from kubernetes clusters
|
||||
// key clusterName
|
||||
// value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address
|
||||
endpointMap map[string]int
|
||||
// cluster service map hold serivice status info from kubernetes clusters
|
||||
// key clusterName
|
||||
|
||||
serviceStatusMap map[string]api.LoadBalancerStatus
|
||||
|
||||
// Ensures only one goroutine can operate on this service at any given time.
|
||||
rwlock sync.Mutex
|
||||
|
||||
// Controls error back-off for procceeding federation service to k8s clusters
|
||||
lastRetryDelay time.Duration
|
||||
// Controls error back-off for updating federation service back to federation apiserver
|
||||
lastFedUpdateDelay time.Duration
|
||||
// Controls error back-off for dns record update
|
||||
lastDNSUpdateDelay time.Duration
|
||||
}
|
||||
|
||||
type serviceCache struct {
|
||||
rwlock sync.Mutex // protects serviceMap
|
||||
// federation service map contains all service received from federation apiserver
|
||||
// key serviceName
|
||||
fedServiceMap map[string]*cachedService
|
||||
}
|
||||
|
||||
type ServiceController struct {
|
||||
dns dnsprovider.Interface
|
||||
federationClient federationclientset.Interface
|
||||
zones []dnsprovider.Zone
|
||||
serviceCache *serviceCache
|
||||
clusterCache *clusterClientCache
|
||||
// A store of services, populated by the serviceController
|
||||
serviceStore cache.StoreToServiceLister
|
||||
// Watches changes to all services
|
||||
serviceController *framework.Controller
|
||||
// A store of services, populated by the serviceController
|
||||
clusterStore federationcache.StoreToClusterLister
|
||||
// Watches changes to all services
|
||||
clusterController *framework.Controller
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
// services that need to be synced
|
||||
queue *workqueue.Type
|
||||
knownClusterSet sets.String
|
||||
}
|
||||
|
||||
// New returns a new service controller to keep DNS provider service resources
|
||||
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
|
||||
|
||||
func New(federationClient federationclientset.Interface, dns dnsprovider.Interface) *ServiceController {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
// federationClient event is not supported yet
|
||||
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
recorder := broadcaster.NewRecorder(api.EventSource{Component: UserAgentName})
|
||||
|
||||
s := &ServiceController{
|
||||
dns: dns,
|
||||
federationClient: federationClient,
|
||||
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
||||
clusterCache: &clusterClientCache{
|
||||
rwlock: sync.Mutex{},
|
||||
clientMap: make(map[string]*clusterCache),
|
||||
},
|
||||
eventBroadcaster: broadcaster,
|
||||
eventRecorder: recorder,
|
||||
queue: workqueue.New(),
|
||||
knownClusterSet: make(sets.String),
|
||||
}
|
||||
s.serviceStore.Store, s.serviceController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.federationClient.Core().Services(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return s.federationClient.Core().Services(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Service{},
|
||||
serviceSyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: s.enqueueService,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
// there is case that old and new are equals but we still catch the event now.
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
s.enqueueService(cur)
|
||||
}
|
||||
},
|
||||
DeleteFunc: s.enqueueService,
|
||||
},
|
||||
)
|
||||
s.clusterStore.Store, s.clusterController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.federationClient.Federation().Clusters().List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return s.federationClient.Federation().Clusters().Watch(options)
|
||||
},
|
||||
},
|
||||
&federation.Cluster{},
|
||||
clusterSyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: s.clusterCache.delFromClusterSet,
|
||||
AddFunc: s.clusterCache.addToClientMap,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldCluster, ok := old.(*federation.Cluster)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
curCluster, ok := cur.(*federation.Cluster)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
|
||||
// update when spec is changed
|
||||
s.clusterCache.addToClientMap(cur)
|
||||
}
|
||||
|
||||
pred := getClusterConditionPredicate()
|
||||
// only update when condition changed to ready from not-ready
|
||||
if !pred(*oldCluster) && pred(*curCluster) {
|
||||
s.clusterCache.addToClientMap(cur)
|
||||
}
|
||||
// did not handle ready -> not-ready
|
||||
// how could we stop a controller?
|
||||
},
|
||||
},
|
||||
)
|
||||
return s
|
||||
}
|
||||
|
||||
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (s *ServiceController) enqueueService(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
s.queue.Add(key)
|
||||
}
|
||||
|
||||
// Run starts a background goroutine that watches for changes to federation services
|
||||
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
|
||||
// federationSyncPeriod controls how often we check the federation's services to
|
||||
// ensure that the correct Kubernetes services (and associated DNS entries) exist.
|
||||
// This is only necessary to fudge over failed watches.
|
||||
// clusterSyncPeriod controls how often we check the federation's underlying clusters and
|
||||
// their Kubernetes services to ensure that matching services created independently of the Federation
|
||||
// (e.g. directly via the underlying cluster's API) are correctly accounted for.
|
||||
|
||||
// It's an error to call Run() more than once for a given ServiceController
|
||||
// object.
|
||||
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
|
||||
defer runtime.HandleCrash()
|
||||
go s.serviceController.Run(stopCh)
|
||||
go s.clusterController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(s.fedServiceWorker, time.Second, stopCh)
|
||||
}
|
||||
go wait.Until(s.clusterEndpointWorker, time.Second, stopCh)
|
||||
go wait.Until(s.clusterServiceWorker, time.Second, stopCh)
|
||||
go wait.Until(s.clusterSyncLoop, clusterSyncPeriod, stopCh)
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Federation Service Controller")
|
||||
s.queue.ShutDown()
|
||||
return nil
|
||||
}
|
||||
|
||||
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncService is never invoked concurrently with the same key.
|
||||
func (s *ServiceController) fedServiceWorker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := s.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
|
||||
defer s.queue.Done(key)
|
||||
err := s.syncService(key.(string))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing service: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func wantsDNSRecords(service *api.Service) bool {
|
||||
return service.Spec.Type == api.ServiceTypeLoadBalancer
|
||||
}
|
||||
|
||||
// processServiceForCluster creates or updates service to all registered running clusters,
|
||||
// update DNS records and update the service info with DNS entries to federation apiserver.
|
||||
// the function returns any error caught
|
||||
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error {
|
||||
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
||||
// Create or Update k8s Service
|
||||
err := s.ensureClusterService(cachedService, clusterName, service, client)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateFederationService Returns whatever error occurred along with a boolean indicator of whether it
|
||||
// should be retried.
|
||||
func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) {
|
||||
// Clone federation service, and create them in underlying k8s cluster
|
||||
clone, err := conversion.NewCloner().DeepCopy(cachedService.lastState)
|
||||
if err != nil {
|
||||
return err, !retryable
|
||||
}
|
||||
service, ok := clone.(*api.Service)
|
||||
if !ok {
|
||||
return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable
|
||||
}
|
||||
|
||||
// handle available clusters one by one
|
||||
var hasErr bool
|
||||
for clusterName, cache := range s.clusterCache.clientMap {
|
||||
go func(cache *clusterCache, clusterName string) {
|
||||
err = s.processServiceForCluster(cachedService, clusterName, service, cache.clientset)
|
||||
if err != nil {
|
||||
hasErr = true
|
||||
}
|
||||
}(cache, clusterName)
|
||||
}
|
||||
if hasErr {
|
||||
// detail error has been dumpped inside the loop
|
||||
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", service.Namespace, service.Name), retryable
|
||||
}
|
||||
return nil, !retryable
|
||||
}
|
||||
|
||||
func (s *ServiceController) deleteFederationService(cachedService *cachedService) (error, bool) {
|
||||
// handle available clusters one by one
|
||||
var hasErr bool
|
||||
for clusterName, cluster := range s.clusterCache.clientMap {
|
||||
err := s.deleteClusterService(clusterName, cachedService, cluster.clientset)
|
||||
if err != nil {
|
||||
hasErr = true
|
||||
}
|
||||
}
|
||||
if hasErr {
|
||||
// detail error has been dumpped inside the loop
|
||||
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", cachedService.lastState.Namespace, cachedService.lastState.Name), retryable
|
||||
}
|
||||
return nil, !retryable
|
||||
}
|
||||
|
||||
func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *clientset.Clientset) error {
|
||||
service := cachedService.lastState
|
||||
glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
|
||||
var err error
|
||||
for i := 0; i < clientRetryCount; i++ {
|
||||
err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{})
|
||||
if err == nil || errors.IsNotFound(err) {
|
||||
glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName)
|
||||
return nil
|
||||
}
|
||||
time.Sleep(cachedService.nextRetryDelay())
|
||||
}
|
||||
glog.V(4).Infof("Failed to delete service %s/%s from cluster %s, %+v", service.Namespace, service.Name, clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error {
|
||||
var err error
|
||||
var needUpdate bool
|
||||
for i := 0; i < clientRetryCount; i++ {
|
||||
svc, err := client.Core().Services(service.Namespace).Get(service.Name)
|
||||
if err == nil {
|
||||
// service exists
|
||||
glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
|
||||
//reserve immutable fields
|
||||
service.Spec.ClusterIP = svc.Spec.ClusterIP
|
||||
|
||||
//reserve auto assigned field
|
||||
for i, oldPort := range svc.Spec.Ports {
|
||||
for _, port := range service.Spec.Ports {
|
||||
if port.NodePort == 0 {
|
||||
if !portEqualExcludeNodePort(&oldPort, &port) {
|
||||
svc.Spec.Ports[i] = port
|
||||
needUpdate = true
|
||||
}
|
||||
} else {
|
||||
if !portEqualForLB(&oldPort, &port) {
|
||||
svc.Spec.Ports[i] = port
|
||||
needUpdate = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if needUpdate {
|
||||
// we only apply spec update
|
||||
svc.Spec = service.Spec
|
||||
_, err = client.Core().Services(svc.Namespace).Update(svc)
|
||||
if err == nil {
|
||||
glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName)
|
||||
return nil
|
||||
} else {
|
||||
glog.V(4).Infof("Failed to update %+v", err)
|
||||
}
|
||||
} else {
|
||||
glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName)
|
||||
return nil
|
||||
}
|
||||
} else if errors.IsNotFound(err) {
|
||||
// Create service if it is not found
|
||||
glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new",
|
||||
service.Namespace, service.Name, clusterName)
|
||||
service.ResourceVersion = ""
|
||||
_, err = client.Core().Services(service.Namespace).Create(service)
|
||||
if err == nil {
|
||||
glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName)
|
||||
return nil
|
||||
}
|
||||
glog.V(4).Infof("Failed to create %+v", err)
|
||||
if errors.IsAlreadyExists(err) {
|
||||
glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if errors.IsConflict(err) {
|
||||
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
||||
service.Namespace, service.Name, err)
|
||||
}
|
||||
// should we reuse same retry delay for all clusters?
|
||||
time.Sleep(cachedService.nextRetryDelay())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *serviceCache) allServices() []*cachedService {
|
||||
s.rwlock.Lock()
|
||||
defer s.rwlock.Unlock()
|
||||
services := make([]*cachedService, 0, len(s.fedServiceMap))
|
||||
for _, v := range s.fedServiceMap {
|
||||
services = append(services, v)
|
||||
}
|
||||
return services
|
||||
}
|
||||
|
||||
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
|
||||
s.rwlock.Lock()
|
||||
defer s.rwlock.Unlock()
|
||||
service, ok := s.fedServiceMap[serviceName]
|
||||
return service, ok
|
||||
}
|
||||
|
||||
func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
|
||||
s.rwlock.Lock()
|
||||
defer s.rwlock.Unlock()
|
||||
service, ok := s.fedServiceMap[serviceName]
|
||||
if !ok {
|
||||
service = &cachedService{
|
||||
endpointMap: make(map[string]int),
|
||||
serviceStatusMap: make(map[string]api.LoadBalancerStatus),
|
||||
}
|
||||
s.fedServiceMap[serviceName] = service
|
||||
}
|
||||
return service
|
||||
}
|
||||
|
||||
func (s *serviceCache) set(serviceName string, service *cachedService) {
|
||||
s.rwlock.Lock()
|
||||
defer s.rwlock.Unlock()
|
||||
s.fedServiceMap[serviceName] = service
|
||||
}
|
||||
|
||||
func (s *serviceCache) delete(serviceName string) {
|
||||
s.rwlock.Lock()
|
||||
defer s.rwlock.Unlock()
|
||||
delete(s.fedServiceMap, serviceName)
|
||||
}
|
||||
|
||||
// needsUpdateDNS check if the dns records of the given service should be updated
|
||||
func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *api.Service) bool {
|
||||
if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
|
||||
return false
|
||||
}
|
||||
if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
|
||||
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v",
|
||||
oldService.Spec.Type, newService.Spec.Type)
|
||||
return true
|
||||
}
|
||||
if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
|
||||
return true
|
||||
}
|
||||
if !LoadBalancerIPsAreEqual(oldService, newService) {
|
||||
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
|
||||
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
|
||||
return true
|
||||
}
|
||||
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
|
||||
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
|
||||
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
|
||||
return true
|
||||
}
|
||||
for i := range oldService.Spec.ExternalIPs {
|
||||
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
|
||||
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v",
|
||||
newService.Spec.ExternalIPs[i])
|
||||
return true
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
|
||||
return true
|
||||
}
|
||||
if oldService.UID != newService.UID {
|
||||
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v",
|
||||
oldService.UID, newService.UID)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
|
||||
// TODO: quinton: Probably applies for DNS SVC records. Come back to this.
|
||||
//var protocol api.Protocol
|
||||
|
||||
ports := []*api.ServicePort{}
|
||||
for i := range service.Spec.Ports {
|
||||
sp := &service.Spec.Ports[i]
|
||||
// The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation
|
||||
ports = append(ports, sp)
|
||||
}
|
||||
return ports, nil
|
||||
}
|
||||
|
||||
func portsEqualForLB(x, y *api.Service) bool {
|
||||
xPorts, err := getPortsForLB(x)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
yPorts, err := getPortsForLB(y)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return portSlicesEqualForLB(xPorts, yPorts)
|
||||
}
|
||||
|
||||
func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
|
||||
if len(x) != len(y) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := range x {
|
||||
if !portEqualForLB(x[i], y[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func portEqualForLB(x, y *api.ServicePort) bool {
|
||||
// TODO: Should we check name? (In theory, an LB could expose it)
|
||||
if x.Name != y.Name {
|
||||
return false
|
||||
}
|
||||
|
||||
if x.Protocol != y.Protocol {
|
||||
return false
|
||||
}
|
||||
|
||||
if x.Port != y.Port {
|
||||
return false
|
||||
}
|
||||
if x.NodePort != y.NodePort {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func portEqualExcludeNodePort(x, y *api.ServicePort) bool {
|
||||
// TODO: Should we check name? (In theory, an LB could expose it)
|
||||
if x.Name != y.Name {
|
||||
return false
|
||||
}
|
||||
|
||||
if x.Protocol != y.Protocol {
|
||||
return false
|
||||
}
|
||||
|
||||
if x.Port != y.Port {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func clustersFromList(list *federation.ClusterList) []string {
|
||||
result := []string{}
|
||||
for ix := range list.Items {
|
||||
result = append(result, list.Items[ix].Name)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// getClusterConditionPredicate filter all clusters meet condition of
|
||||
// condition.type=Ready and condition.status=true
|
||||
func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
|
||||
return func(cluster federation.Cluster) bool {
|
||||
// If we have no info, don't accept
|
||||
if len(cluster.Status.Conditions) == 0 {
|
||||
return false
|
||||
}
|
||||
for _, cond := range cluster.Status.Conditions {
|
||||
//We consider the cluster for load balancing only when its ClusterReady condition status
|
||||
//is ConditionTrue
|
||||
if cond.Type == federation.ClusterReady && cond.Status != api.ConditionTrue {
|
||||
glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// clusterSyncLoop observes running clusters changes, and apply all services to new added cluster
|
||||
// and add dns records for the changes
|
||||
func (s *ServiceController) clusterSyncLoop() {
|
||||
var servicesToUpdate []*cachedService
|
||||
// should we remove cache for cluster from ready to not ready? should remove the condition predicate if no
|
||||
clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List()
|
||||
if err != nil {
|
||||
glog.Infof("Fail to get cluster list")
|
||||
return
|
||||
}
|
||||
newClusters := clustersFromList(&clusters)
|
||||
var newSet, increase sets.String
|
||||
newSet = sets.NewString(newClusters...)
|
||||
if newSet.Equal(s.knownClusterSet) {
|
||||
// The set of cluster names in the services in the federation hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
|
||||
return
|
||||
}
|
||||
glog.Infof("Detected change in list of cluster names. New set: %v, Old set: %v", newSet, s.knownClusterSet)
|
||||
increase = newSet.Difference(s.knownClusterSet)
|
||||
// do nothing when cluster is removed.
|
||||
if increase != nil {
|
||||
for newCluster := range increase {
|
||||
glog.Infof("New cluster observed %s", newCluster)
|
||||
s.updateAllServicesToCluster(servicesToUpdate, newCluster)
|
||||
}
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
servicesToUpdate = s.serviceCache.allServices()
|
||||
numServices := len(servicesToUpdate)
|
||||
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
|
||||
glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster",
|
||||
numServices-len(servicesToUpdate), numServices)
|
||||
}
|
||||
s.knownClusterSet = newSet
|
||||
}
|
||||
|
||||
func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) {
|
||||
cluster, ok := s.clusterCache.clientMap[clusterName]
|
||||
if ok {
|
||||
for _, cachedService := range services {
|
||||
appliedState := cachedService.appliedState
|
||||
s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ServiceController) removeAllServicesFromCluster(services []*cachedService, clusterName string) {
|
||||
client, ok := s.clusterCache.clientMap[clusterName]
|
||||
if ok {
|
||||
for _, cachedService := range services {
|
||||
s.deleteClusterService(clusterName, cachedService, client.clientset)
|
||||
}
|
||||
glog.Infof("Synced all services to cluster %s", clusterName)
|
||||
}
|
||||
}
|
||||
|
||||
// updateDNSRecords updates all existing federation service DNS Records so that
|
||||
// they will match the list of cluster names provided.
|
||||
// Returns the list of services that couldn't be updated.
|
||||
func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) {
|
||||
for _, service := range services {
|
||||
func() {
|
||||
service.rwlock.Lock()
|
||||
defer service.rwlock.Unlock()
|
||||
// If the applied state is nil, that means it hasn't yet been successfully dealt
|
||||
// with by the DNS Record reconciler. We can trust the DNS Record
|
||||
// reconciler to ensure the federation service's DNS records are created to target
|
||||
// the correct backend service IP's
|
||||
if service.appliedState == nil {
|
||||
return
|
||||
}
|
||||
if err := s.lockedUpdateDNSRecords(service, clusters); err != nil {
|
||||
glog.Errorf("External error while updating DNS Records: %v.", err)
|
||||
servicesToRetry = append(servicesToRetry, service)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return servicesToRetry
|
||||
}
|
||||
|
||||
// lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex
|
||||
// associated with the service.
|
||||
// TODO: quinton: Still screwed up in the same way as above. Fix.
|
||||
func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error {
|
||||
if !wantsDNSRecords(service.appliedState) {
|
||||
return nil
|
||||
}
|
||||
for key := range s.clusterCache.clientMap {
|
||||
for _, clusterName := range clusterNames {
|
||||
if key == clusterName {
|
||||
ensureDNSRecords(clusterName, service)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func LoadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
|
||||
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
|
||||
}
|
||||
|
||||
// Computes the next retry, using exponential backoff
|
||||
// mutex must be held.
|
||||
func (s *cachedService) nextRetryDelay() time.Duration {
|
||||
s.lastRetryDelay = s.lastRetryDelay * 2
|
||||
if s.lastRetryDelay < minRetryDelay {
|
||||
s.lastRetryDelay = minRetryDelay
|
||||
}
|
||||
if s.lastRetryDelay > maxRetryDelay {
|
||||
s.lastRetryDelay = maxRetryDelay
|
||||
}
|
||||
return s.lastRetryDelay
|
||||
}
|
||||
|
||||
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
||||
func (s *cachedService) resetRetryDelay() {
|
||||
s.lastRetryDelay = time.Duration(0)
|
||||
}
|
||||
|
||||
// Computes the next retry, using exponential backoff
|
||||
// mutex must be held.
|
||||
func (s *cachedService) nextFedUpdateDelay() time.Duration {
|
||||
s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2
|
||||
if s.lastFedUpdateDelay < minRetryDelay {
|
||||
s.lastFedUpdateDelay = minRetryDelay
|
||||
}
|
||||
if s.lastFedUpdateDelay > maxRetryDelay {
|
||||
s.lastFedUpdateDelay = maxRetryDelay
|
||||
}
|
||||
return s.lastFedUpdateDelay
|
||||
}
|
||||
|
||||
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
||||
func (s *cachedService) resetFedUpdateDelay() {
|
||||
s.lastFedUpdateDelay = time.Duration(0)
|
||||
}
|
||||
|
||||
// Computes the next retry, using exponential backoff
|
||||
// mutex must be held.
|
||||
func (s *cachedService) nextDNSUpdateDelay() time.Duration {
|
||||
s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2
|
||||
if s.lastDNSUpdateDelay < minRetryDelay {
|
||||
s.lastDNSUpdateDelay = minRetryDelay
|
||||
}
|
||||
if s.lastDNSUpdateDelay > maxRetryDelay {
|
||||
s.lastDNSUpdateDelay = maxRetryDelay
|
||||
}
|
||||
return s.lastDNSUpdateDelay
|
||||
}
|
||||
|
||||
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
||||
func (s *cachedService) resetDNSUpdateDelay() {
|
||||
s.lastDNSUpdateDelay = time.Duration(0)
|
||||
}
|
||||
|
||||
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
|
||||
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
|
||||
// invoked concurrently with the same key.
|
||||
func (s *ServiceController) syncService(key string) error {
|
||||
startTime := time.Now()
|
||||
var cachedService *cachedService
|
||||
var retryDelay time.Duration
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
// obj holds the latest service info from apiserver
|
||||
obj, exists, err := s.serviceStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
|
||||
s.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
||||
glog.Infof("Service has been deleted %v", key)
|
||||
err, retryDelay = s.processServiceDeletion(key)
|
||||
}
|
||||
|
||||
if exists {
|
||||
service, ok := obj.(*api.Service)
|
||||
if ok {
|
||||
cachedService = s.serviceCache.getOrCreate(key)
|
||||
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
|
||||
} else {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj)
|
||||
}
|
||||
glog.Infof("Found tombstone for %v", key)
|
||||
err, retryDelay = s.processServiceDeletion(tombstone.Key)
|
||||
}
|
||||
}
|
||||
|
||||
if retryDelay != 0 {
|
||||
s.enqueueService(obj)
|
||||
} else if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
|
||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
||||
// we should retry in that Duration.
|
||||
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
|
||||
// Ensure that no other goroutine will interfere with our processing of the
|
||||
// service.
|
||||
cachedService.rwlock.Lock()
|
||||
defer cachedService.rwlock.Unlock()
|
||||
|
||||
// Update the cached service (used above for populating synthetic deletes)
|
||||
// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver
|
||||
// if the same service is changed before this go routine finished, there will be another queue entry to handle that.
|
||||
cachedService.lastState = service
|
||||
err, retry := s.updateFederationService(key, cachedService)
|
||||
if err != nil {
|
||||
message := "Error occurs when updating service to all clusters"
|
||||
if retry {
|
||||
message += " (will retry): "
|
||||
} else {
|
||||
message += " (will not retry): "
|
||||
}
|
||||
message += err.Error()
|
||||
s.eventRecorder.Event(service, api.EventTypeWarning, "UpdateServiceFail", message)
|
||||
return err, cachedService.nextRetryDelay()
|
||||
}
|
||||
// Always update the cache upon success.
|
||||
// NOTE: Since we update the cached service if and only if we successfully
|
||||
// processed it, a cached service being nil implies that it hasn't yet
|
||||
// been successfully processed.
|
||||
|
||||
cachedService.appliedState = service
|
||||
s.serviceCache.set(key, cachedService)
|
||||
glog.V(4).Infof("Successfully procceeded services %s", key)
|
||||
cachedService.resetRetryDelay()
|
||||
return nil, doNotRetry
|
||||
}
|
||||
|
||||
// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration
|
||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
||||
// we should retry in that Duration.
|
||||
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
|
||||
glog.V(2).Infof("Process service deletion for %v", key)
|
||||
cachedService, ok := s.serviceCache.get(key)
|
||||
if !ok {
|
||||
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
|
||||
}
|
||||
service := cachedService.lastState
|
||||
cachedService.rwlock.Lock()
|
||||
defer cachedService.rwlock.Unlock()
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records")
|
||||
// TODO should we delete dns info here or wait for endpoint changes? prefer here
|
||||
// or we do nothing for service deletion
|
||||
//err := s.dns.balancer.EnsureLoadBalancerDeleted(service)
|
||||
err, retry := s.deleteFederationService(cachedService)
|
||||
if err != nil {
|
||||
message := "Error occurs when deleting federation service"
|
||||
if retry {
|
||||
message += " (will retry): "
|
||||
} else {
|
||||
message += " (will not retry): "
|
||||
}
|
||||
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingDNSRecordFailed", message)
|
||||
return err, cachedService.nextRetryDelay()
|
||||
}
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records")
|
||||
s.serviceCache.delete(key)
|
||||
|
||||
cachedService.resetRetryDelay()
|
||||
return nil, doNotRetry
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/federation/apis/federation"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
func TestGetClusterConditionPredicate(t *testing.T) {
|
||||
tests := []struct {
|
||||
cluster federation.Cluster
|
||||
expectAccept bool
|
||||
name string
|
||||
}{
|
||||
{
|
||||
cluster: federation.Cluster{},
|
||||
expectAccept: false,
|
||||
name: "empty",
|
||||
},
|
||||
{
|
||||
cluster: federation.Cluster{
|
||||
Status: federation.ClusterStatus{
|
||||
Conditions: []federation.ClusterCondition{
|
||||
{Type: federation.ClusterReady, Status: api.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectAccept: true,
|
||||
name: "basic",
|
||||
},
|
||||
{
|
||||
cluster: federation.Cluster{
|
||||
Status: federation.ClusterStatus{
|
||||
Conditions: []federation.ClusterCondition{
|
||||
{Type: federation.ClusterReady, Status: api.ConditionFalse},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectAccept: false,
|
||||
name: "notready",
|
||||
},
|
||||
}
|
||||
pred := getClusterConditionPredicate()
|
||||
for _, test := range tests {
|
||||
accept := pred(test.cluster)
|
||||
if accept != test.expectAccept {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept)
|
||||
}
|
||||
}
|
||||
}
|
@ -63,6 +63,7 @@ concurrent-deployment-syncs
|
||||
concurrent-endpoint-syncs
|
||||
concurrent-namespace-syncs
|
||||
concurrent-replicaset-syncs
|
||||
concurrent-service-syncs
|
||||
concurrent-resource-quota-syncs
|
||||
config-sync-period
|
||||
configure-cbr0
|
||||
|
Loading…
Reference in New Issue
Block a user