Merge pull request #26694 from mfanjie/extract_func_for_build_cluster_config

Automatic merge from submit-queue

change clientset of service controller to versioned

1. create a public function BuildClusterConfig() to build cluster config so that service controller can reuse
2. Current implement always requires POD_NAMESPACE to be set and always require federation control plane be placed in k8s cluster. Per my understanding if use did not provide secretRef and want to use unsecure connection, then we should not call KubeconfigGetterForCluster(), instead of we should create the clientset by clientcmd.BuildConfigFromFlags(). 

@nikhiljindal please correct me if I am wrong for bullet 2.
This commit is contained in:
k8s-merge-robot 2016-06-07 18:49:51 -07:00
commit e5da972c8a
14 changed files with 286 additions and 234 deletions

View File

@ -18,7 +18,7 @@ package cache
import ( import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation" "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
kubeCache "k8s.io/kubernetes/pkg/client/cache" kubeCache "k8s.io/kubernetes/pkg/client/cache"
) )
@ -28,16 +28,16 @@ type StoreToClusterLister struct {
kubeCache.Store kubeCache.Store
} }
func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) { func (s *StoreToClusterLister) List() (clusters v1alpha1.ClusterList, err error) {
for _, m := range s.Store.List() { for _, m := range s.Store.List() {
clusters.Items = append(clusters.Items, *(m.(*federation.Cluster))) clusters.Items = append(clusters.Items, *(m.(*v1alpha1.Cluster)))
} }
return clusters, nil return clusters, nil
} }
// ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet // ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet
// some set of criteria defined by the function. // some set of criteria defined by the function.
type ClusterConditionPredicate func(cluster federation.Cluster) bool type ClusterConditionPredicate func(cluster v1alpha1.Cluster) bool
// storeToClusterConditionLister filters and returns nodes matching the given type and status from the store. // storeToClusterConditionLister filters and returns nodes matching the given type and status from the store.
type storeToClusterConditionLister struct { type storeToClusterConditionLister struct {
@ -51,9 +51,9 @@ func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredic
} }
// List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister. // List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister.
func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) { func (s storeToClusterConditionLister) List() (clusters v1alpha1.ClusterList, err error) {
for _, m := range s.store.List() { for _, m := range s.store.List() {
cluster := *m.(*federation.Cluster) cluster := *m.(*v1alpha1.Cluster)
if s.predicate(cluster) { if s.predicate(cluster) {
clusters.Items = append(clusters.Items, cluster) clusters.Items = append(clusters.Items, cluster)
} else { } else {

View File

@ -25,14 +25,13 @@ import (
"net/http/pprof" "net/http/pprof"
"strconv" "strconv"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/pkg/client/restclient"
internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/dnsprovider"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
@ -77,7 +76,7 @@ func Run(s *options.CMServer) error {
glog.Errorf("unable to register configz: %s", err) glog.Errorf("unable to register configz: %s", err)
} }
// Create the config to talk to federation-apiserver. // Create the config to talk to federation-apiserver.
kubeconfigGetter := clustercontroller.KubeconfigGetterForSecret(FederationAPIServerSecretName) kubeconfigGetter := util.KubeconfigGetterForSecret(FederationAPIServerSecretName)
restClientCfg, err := clientcmd.BuildConfigFromKubeconfigGetter(s.Master, kubeconfigGetter) restClientCfg, err := clientcmd.BuildConfigFromKubeconfigGetter(s.Master, kubeconfigGetter)
if err != nil { if err != nil {
return err return err
@ -115,14 +114,14 @@ func Run(s *options.CMServer) error {
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run() go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run()
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile) dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
if err != nil { if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err) glog.Fatalf("Cloud provider could not be initialized: %v", err)
} }
scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scclientset, dns) servicecontroller := servicecontroller.New(scClientset, dns)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err) glog.Errorf("Failed to start service controller: %v", err)
} }

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2014 The Kubernetes Authors All rights reserved. Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -18,22 +18,17 @@ package cluster
import ( import (
"fmt" "fmt"
"net"
"os"
"strings" "strings"
"github.com/golang/glog" "github.com/golang/glog"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/discovery"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -44,81 +39,18 @@ const (
KubeconfigSecretDataKey = "kubeconfig" KubeconfigSecretDataKey = "kubeconfig"
) )
// This is to inject a different kubeconfigGetter in tests.
// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens.
var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
secretRefName := ""
if c.Spec.SecretRef != nil {
secretRefName = c.Spec.SecretRef.Name
} else {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
}
return KubeconfigGetterForSecret(secretRefName)()
}
}
// KubeconfigGettterForSecret is used to get the kubeconfig from the given secret.
var KubeconfigGetterForSecret = func(secretName string) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
// Get the namespace this is running in from the env variable.
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string")
}
// Get a client to talk to the k8s apiserver, to fetch secrets from it.
client, err := client.NewInCluster()
if err != nil {
return nil, fmt.Errorf("error in creating in-cluster client: %s", err)
}
data := []byte{}
if secretName != "" {
secret, err := client.Secrets(namespace).Get(secretName)
if err != nil {
return nil, fmt.Errorf("error in fetching secret: %s", err)
}
ok := false
data, ok = secret.Data[KubeconfigSecretDataKey]
if !ok {
return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey)
}
}
return clientcmd.Load(data)
}
}
type ClusterClient struct { type ClusterClient struct {
discoveryClient *discovery.DiscoveryClient discoveryClient *discovery.DiscoveryClient
kubeClient *clientset.Clientset kubeClient *clientset.Clientset
} }
func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) { func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) {
var serverAddress string clusterConfig, err := util.BuildClusterConfig(c)
hostIP, err := utilnet.ChooseHostInterface()
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, item := range c.Spec.ServerAddressByClientCIDRs {
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
if err != nil {
return nil, err
}
myaddr := net.ParseIP(hostIP.String())
if cidrnet.Contains(myaddr) == true {
serverAddress = item.ServerAddress
break
}
}
var clusterClientSet = ClusterClient{} var clusterClientSet = ClusterClient{}
if serverAddress != "" { if clusterConfig != nil {
kubeconfigGetter := KubeconfigGetterForCluster(c)
clusterConfig, err := clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter)
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName))) clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
if clusterClientSet.discoveryClient == nil { if clusterClientSet.discoveryClient == nil {
return nil, nil return nil, nil

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2014 The Kubernetes Authors All rights reserved. Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -21,7 +21,6 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
cluster_cache "k8s.io/kubernetes/federation/client/cache" cluster_cache "k8s.io/kubernetes/federation/client/cache"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
@ -73,7 +72,7 @@ func NewclusterController(federationClient federationclientset.Interface, cluste
return cc.federationClient.Federation().Clusters().Watch(options) return cc.federationClient.Federation().Clusters().Watch(options)
}, },
}, },
&federation.Cluster{}, &federation_v1alpha1.Cluster{},
controller.NoResyncPeriodFunc(), controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
DeleteFunc: cc.delFromClusterSet, DeleteFunc: cc.delFromClusterSet,

View File

@ -25,6 +25,7 @@ import (
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
controller_util "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
@ -124,8 +125,8 @@ func TestUpdateClusterStatusOK(t *testing.T) {
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
// Override KubeconfigGetterForCluster to avoid having to setup service accounts and mount files with secret tokens. // Override KubeconfigGetterForCluster to avoid having to setup service accounts and mount files with secret tokens.
originalGetter := KubeconfigGetterForCluster originalGetter := controller_util.KubeconfigGetterForCluster
KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter { controller_util.KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) { return func() (*clientcmdapi.Config, error) {
return &clientcmdapi.Config{}, nil return &clientcmdapi.Config{}, nil
} }
@ -146,5 +147,5 @@ func TestUpdateClusterStatusOK(t *testing.T) {
} }
// Reset KubeconfigGetterForCluster // Reset KubeconfigGetterForCluster
KubeconfigGetterForCluster = originalGetter controller_util.KubeconfigGetterForCluster = originalGetter
} }

View File

@ -19,12 +19,12 @@ package service
import ( import (
"sync" "sync"
"k8s.io/kubernetes/federation/apis/federation" v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache" cache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime" pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -32,12 +32,13 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"reflect" "reflect"
) )
type clusterCache struct { type clusterCache struct {
clientset *clientset.Clientset clientset *release_1_3.Clientset
cluster *federation.Cluster cluster *v1alpha1.Cluster
// A store of services, populated by the serviceController // A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister serviceStore cache.StoreToServiceLister
// Watches changes to all services // Watches changes to all services
@ -57,7 +58,7 @@ type clusterClientCache struct {
clientMap map[string]*clusterCache clientMap map[string]*clusterCache
} }
func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) { func (cc *clusterClientCache) startClusterLW(cluster *v1alpha1.Cluster, clusterName string) {
cachedClusterClient, ok := cc.clientMap[clusterName] cachedClusterClient, ok := cc.clientMap[clusterName]
// only create when no existing cachedClusterClient // only create when no existing cachedClusterClient
if ok { if ok {
@ -92,13 +93,13 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer( cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Endpoints(api.NamespaceAll).List(options) return clientset.Core().Endpoints(v1.NamespaceAll).List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Endpoints(api.NamespaceAll).Watch(options) return clientset.Core().Endpoints(v1.NamespaceAll).Watch(options)
}, },
}, },
&api.Endpoints{}, &v1.Endpoints{},
serviceSyncPeriod, serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
@ -116,25 +117,25 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer( cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Services(api.NamespaceAll).List(options) return clientset.Core().Services(v1.NamespaceAll).List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Services(api.NamespaceAll).Watch(options) return clientset.Core().Services(v1.NamespaceAll).Watch(options)
}, },
}, },
&api.Service{}, &v1.Service{},
serviceSyncPeriod, serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
cc.enqueueService(obj, clusterName) cc.enqueueService(obj, clusterName)
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
oldService, ok := old.(*api.Service) oldService, ok := old.(*v1.Service)
if !ok { if !ok {
return return
} }
curService, ok := cur.(*api.Service) curService, ok := cur.(*v1.Service)
if !ok { if !ok {
return return
} }
@ -143,7 +144,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
service, _ := obj.(*api.Service) service, _ := obj.(*v1.Service)
cc.enqueueService(obj, clusterName) cc.enqueueService(obj, clusterName)
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName) glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
}, },
@ -161,7 +162,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
// delFromClusterSet delete a cluster from clusterSet and // delFromClusterSet delete a cluster from clusterSet and
// delete the corresponding restclient from the map clusterKubeClientMap // delete the corresponding restclient from the map clusterKubeClientMap
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) { func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
cluster, ok := obj.(*federation.Cluster) cluster, ok := obj.(*v1alpha1.Cluster)
cc.rwlock.Lock() cc.rwlock.Lock()
defer cc.rwlock.Unlock() defer cc.rwlock.Unlock()
if ok { if ok {
@ -180,10 +181,10 @@ func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding // addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
// restclient to map clusterKubeClientMap // restclient to map clusterKubeClientMap
func (cc *clusterClientCache) addToClientMap(obj interface{}) { func (cc *clusterClientCache) addToClientMap(obj interface{}) {
cluster := obj.(*federation.Cluster) cluster := obj.(*v1alpha1.Cluster)
cc.rwlock.Lock() cc.rwlock.Lock()
defer cc.rwlock.Unlock() defer cc.rwlock.Unlock()
cluster, ok := obj.(*federation.Cluster) cluster, ok := obj.(*v1alpha1.Cluster)
if !ok { if !ok {
return return
} }
@ -195,13 +196,11 @@ func (cc *clusterClientCache) addToClientMap(obj interface{}) {
} }
} }
func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) { func newClusterClientset(c *v1alpha1.Cluster) (*release_1_3.Clientset, error) {
clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "") clusterConfig, err := util.BuildClusterConfig(c)
if err != nil { if clusterConfig != nil {
return nil, err clientset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
} }
clusterConfig.QPS = KubeAPIQPS return nil, err
clusterConfig.Burst = KubeAPIBurst
clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
} }

View File

@ -254,6 +254,9 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// the state of the service when we last successfully sync'd it's DNS records. // the state of the service when we last successfully sync'd it's DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records. // So this time around we only need to patch that (add new records, remove deleted records, and update changed records.
// //
if s.dns == nil {
return nil
}
if s == nil { if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService) return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
} }
@ -263,6 +266,9 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
serviceName := cachedService.lastState.Name serviceName := cachedService.lastState.Name
namespaceName := cachedService.lastState.Namespace namespaceName := cachedService.lastState.Namespace
zoneNames, regionName, err := s.getClusterZoneNames(clusterName) zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
if zoneNames == nil {
return fmt.Errorf("fail to get cluster zone names")
}
if err != nil { if err != nil {
return err return err
} }

View File

@ -20,8 +20,8 @@ import (
"fmt" "fmt"
"time" "time"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/pkg/api" v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache" cache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
@ -54,7 +54,7 @@ func (sc *ServiceController) clusterEndpointWorker() {
// Whenever there is change on endpoint, the federation service should be updated // Whenever there is change on endpoint, the federation service should be updated
// key is the namespaced name of endpoint // key is the namespaced name of endpoint
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface, serviceController *ServiceController) error { func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, serviceController *ServiceController) error {
cachedService, ok := serviceCache.get(key) cachedService, ok := serviceCache.get(key)
if !ok { if !ok {
// here we filtered all non-federation services // here we filtered all non-federation services
@ -67,7 +67,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
return err return err
} }
if exists { if exists {
endpoint, ok := endpointInterface.(*api.Endpoints) endpoint, ok := endpointInterface.(*v1.Endpoints)
if ok { if ok {
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController) err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
@ -117,7 +117,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// Update dns info when endpoint update event received // Update dns info when endpoint update event received
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0 // We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string, serviceController *ServiceController) error { func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName) glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.rwlock.Lock() cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock() defer cachedService.rwlock.Unlock()

View File

@ -19,9 +19,9 @@ package service
import ( import (
"testing" "testing"
"k8s.io/kubernetes/federation/apis/federation" "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
"k8s.io/kubernetes/pkg/api" v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -38,14 +38,14 @@ var fakeServiceController = ServiceController{
knownClusterSet: make(sets.String), knownClusterSet: make(sets.String),
} }
func buildEndpoint(subsets [][]string) *api.Endpoints { func buildEndpoint(subsets [][]string) *v1.Endpoints {
endpoint := &api.Endpoints{ endpoint := &v1.Endpoints{
Subsets: []api.EndpointSubset{ Subsets: []v1.EndpointSubset{
{Addresses: []api.EndpointAddress{}}, {Addresses: []v1.EndpointAddress{}},
}, },
} }
for _, element := range subsets { for _, element := range subsets {
address := api.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil} address := v1.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil}
endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address) endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address)
} }
return endpoint return endpoint
@ -58,14 +58,14 @@ func TestProcessEndpointUpdate(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
cachedService *cachedService cachedService *cachedService
endpoint *api.Endpoints endpoint *v1.Endpoints
clusterName string clusterName string
expectResult int expectResult int
}{ }{
{ {
"no-cache", "no-cache",
&cachedService{ &cachedService{
lastState: &api.Service{}, lastState: &v1.Service{},
endpointMap: make(map[string]int), endpointMap: make(map[string]int),
}, },
buildEndpoint([][]string{{"ip1", ""}}), buildEndpoint([][]string{{"ip1", ""}}),
@ -75,7 +75,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
{ {
"has-cache", "has-cache",
&cachedService{ &cachedService{
lastState: &api.Service{}, lastState: &v1.Service{},
endpointMap: map[string]int{ endpointMap: map[string]int{
"foo": 1, "foo": 1,
}, },
@ -99,8 +99,8 @@ func TestProcessEndpointDeletion(t *testing.T) {
cc := clusterClientCache{ cc := clusterClientCache{
clientMap: map[string]*clusterCache{ clientMap: map[string]*clusterCache{
clusterName: { clusterName: {
cluster: &federation.Cluster{ cluster: &v1alpha1.Cluster{
Status: federation.ClusterStatus{ Status: v1alpha1.ClusterStatus{
Zones: []string{"foozone"}, Zones: []string{"foozone"},
Region: "fooregion", Region: "fooregion",
}, },
@ -111,14 +111,14 @@ func TestProcessEndpointDeletion(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
cachedService *cachedService cachedService *cachedService
endpoint *api.Endpoints endpoint *v1.Endpoints
clusterName string clusterName string
expectResult int expectResult int
}{ }{
{ {
"no-cache", "no-cache",
&cachedService{ &cachedService{
lastState: &api.Service{}, lastState: &v1.Service{},
endpointMap: make(map[string]int), endpointMap: make(map[string]int),
}, },
buildEndpoint([][]string{{"ip1", ""}}), buildEndpoint([][]string{{"ip1", ""}}),
@ -128,7 +128,7 @@ func TestProcessEndpointDeletion(t *testing.T) {
{ {
"has-cache", "has-cache",
&cachedService{ &cachedService{
lastState: &api.Service{}, lastState: &v1.Service{},
endpointMap: map[string]int{ endpointMap: map[string]int{
clusterName: 1, clusterName: 1,
}, },

View File

@ -20,9 +20,9 @@ import (
"fmt" "fmt"
"time" "time"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache" cache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() {
} }
// Whenever there is change on service, the federation service should be updated // Whenever there is change on service, the federation service should be updated
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error { func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error {
// obj holds the latest service info from apiserver, return if there is no federation cache for the service // obj holds the latest service info from apiserver, return if there is no federation cache for the service
cachedService, ok := serviceCache.get(key) cachedService, ok := serviceCache.get(key)
if !ok { if !ok {
@ -70,7 +70,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
} }
var needUpdate bool var needUpdate bool
if exists { if exists {
service, ok := serviceInterface.(*api.Service) service, ok := serviceInterface.(*v1.Service)
if ok { if ok {
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
@ -140,7 +140,7 @@ func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedServic
// processServiceUpdate Update ingress info when service updated // processServiceUpdate Update ingress info when service updated
// the function returns a bool to indicate if actual update happend on federation service cache // the function returns a bool to indicate if actual update happend on federation service cache
// and if the federation service cache is updated, the updated info should be post to federation apiserver // and if the federation service cache is updated, the updated info should be post to federation apiserver
func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *api.Service, clusterName string) bool { func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *v1.Service, clusterName string) bool {
glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
cachedService.rwlock.Lock() cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock() defer cachedService.rwlock.Unlock()
@ -214,7 +214,7 @@ func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService,
return needUpdate return needUpdate
} }
func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federationclientset.Interface) error { func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federation_release_1_3.Interface) error {
service := cachedService.lastState service := cachedService.lastState
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name) glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
var err error var err error

View File

@ -20,15 +20,15 @@ import (
"reflect" "reflect"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1"
) )
func buildServiceStatus(ingresses [][]string) api.LoadBalancerStatus { func buildServiceStatus(ingresses [][]string) v1.LoadBalancerStatus {
status := api.LoadBalancerStatus{ status := v1.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{}, Ingress: []v1.LoadBalancerIngress{},
} }
for _, element := range ingresses { for _, element := range ingresses {
ingress := api.LoadBalancerIngress{IP: element[0], Hostname: element[1]} ingress := v1.LoadBalancerIngress{IP: element[0], Hostname: element[1]}
status.Ingress = append(status.Ingress, ingress) status.Ingress = append(status.Ingress, ingress)
} }
return status return status
@ -41,18 +41,18 @@ func TestProcessServiceUpdate(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
cachedService *cachedService cachedService *cachedService
service *api.Service service *v1.Service
clusterName string clusterName string
expectNeedUpdate bool expectNeedUpdate bool
expectStatus api.LoadBalancerStatus expectStatus v1.LoadBalancerStatus
}{ }{
{ {
"no-cache", "no-cache",
&cachedService{ &cachedService{
lastState: &api.Service{}, lastState: &v1.Service{},
serviceStatusMap: make(map[string]api.LoadBalancerStatus), serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
}, },
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo", "foo",
true, true,
buildServiceStatus([][]string{{"ip1", ""}}), buildServiceStatus([][]string{{"ip1", ""}}),
@ -60,12 +60,12 @@ func TestProcessServiceUpdate(t *testing.T) {
{ {
"same-ingress", "same-ingress",
&cachedService{ &cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{ serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, "foo1": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}},
}, },
}, },
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1", "foo1",
false, false,
buildServiceStatus([][]string{{"ip1", ""}}), buildServiceStatus([][]string{{"ip1", ""}}),
@ -73,14 +73,14 @@ func TestProcessServiceUpdate(t *testing.T) {
{ {
"diff-cluster", "diff-cluster",
&cachedService{ &cachedService{
lastState: &api.Service{ lastState: &v1.Service{
ObjectMeta: api.ObjectMeta{Name: "bar1"}, ObjectMeta: v1.ObjectMeta{Name: "bar1"},
}, },
serviceStatusMap: map[string]api.LoadBalancerStatus{ serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo2": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, "foo2": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}},
}, },
}, },
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1", "foo1",
true, true,
buildServiceStatus([][]string{{"ip1", ""}}), buildServiceStatus([][]string{{"ip1", ""}}),
@ -88,12 +88,12 @@ func TestProcessServiceUpdate(t *testing.T) {
{ {
"diff-ingress", "diff-ingress",
&cachedService{ &cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}}, lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{ serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}), "foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}),
}, },
}, },
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}}, &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}},
"foo1", "foo1",
true, true,
buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}), buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}),
@ -117,20 +117,20 @@ func TestProcessServiceDeletion(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
cachedService *cachedService cachedService *cachedService
service *api.Service service *v1.Service
clusterName string clusterName string
expectNeedUpdate bool expectNeedUpdate bool
expectStatus api.LoadBalancerStatus expectStatus v1.LoadBalancerStatus
}{ }{
{ {
"same-ingress", "same-ingress",
&cachedService{ &cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{ serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, "foo1": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}},
}, },
}, },
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1", "foo1",
true, true,
buildServiceStatus([][]string{}), buildServiceStatus([][]string{}),
@ -138,13 +138,13 @@ func TestProcessServiceDeletion(t *testing.T) {
{ {
"diff-ingress", "diff-ingress",
&cachedService{ &cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}}, lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{ serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}), "foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}),
"foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), "foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
}, },
}, },
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}}, &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}},
"foo1", "foo1",
true, true,
buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),

View File

@ -24,14 +24,15 @@ import (
"reflect" "reflect"
"github.com/golang/glog" "github.com/golang/glog"
federation "k8s.io/kubernetes/federation/apis/federation" v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
federationcache "k8s.io/kubernetes/federation/client/cache" federationcache "k8s.io/kubernetes/federation/client/cache"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache" cache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
@ -46,9 +47,8 @@ import (
) )
const ( const (
// TODO update to 10 mins before merge serviceSyncPeriod = 10 * time.Minute
serviceSyncPeriod = 30 * time.Second clusterSyncPeriod = 10 * time.Minute
clusterSyncPeriod = 100 * time.Second
// How long to wait before retrying the processing of a service change. // How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
@ -71,15 +71,15 @@ const (
) )
type cachedService struct { type cachedService struct {
lastState *api.Service lastState *v1.Service
// The state as successfully applied to the DNS server // The state as successfully applied to the DNS server
appliedState *api.Service appliedState *v1.Service
// cluster endpoint map hold subset info from kubernetes clusters // cluster endpoint map hold subset info from kubernetes clusters
// key clusterName // key clusterName
// value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address // value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address
endpointMap map[string]int endpointMap map[string]int
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName // serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
serviceStatusMap map[string]api.LoadBalancerStatus serviceStatusMap map[string]v1.LoadBalancerStatus
// Ensures only one goroutine can operate on this service at any given time. // Ensures only one goroutine can operate on this service at any given time.
rwlock sync.Mutex rwlock sync.Mutex
// Controls error back-off for procceeding federation service to k8s clusters // Controls error back-off for procceeding federation service to k8s clusters
@ -99,7 +99,7 @@ type serviceCache struct {
type ServiceController struct { type ServiceController struct {
dns dnsprovider.Interface dns dnsprovider.Interface
federationClient federationclientset.Interface federationClient federation_release_1_3.Interface
federationName string federationName string
// each federation should be configured with a single zone (e.g. "mycompany.com") // each federation should be configured with a single zone (e.g. "mycompany.com")
dnsZones dnsprovider.Zones dnsZones dnsprovider.Zones
@ -123,7 +123,7 @@ type ServiceController struct {
// New returns a new service controller to keep DNS provider service resources // New returns a new service controller to keep DNS provider service resources
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. // (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
func New(federationClient federationclientset.Interface, dns dnsprovider.Interface) *ServiceController { func New(federationClient federation_release_1_3.Interface, dns dnsprovider.Interface) *ServiceController {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
// federationClient event is not supported yet // federationClient event is not supported yet
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) // broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
@ -145,13 +145,13 @@ func New(federationClient federationclientset.Interface, dns dnsprovider.Interfa
s.serviceStore.Store, s.serviceController = framework.NewInformer( s.serviceStore.Store, s.serviceController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return s.federationClient.Core().Services(api.NamespaceAll).List(options) return s.federationClient.Core().Services(v1.NamespaceAll).List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return s.federationClient.Core().Services(api.NamespaceAll).Watch(options) return s.federationClient.Core().Services(v1.NamespaceAll).Watch(options)
}, },
}, },
&api.Service{}, &v1.Service{},
serviceSyncPeriod, serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService, AddFunc: s.enqueueService,
@ -173,17 +173,17 @@ func New(federationClient federationclientset.Interface, dns dnsprovider.Interfa
return s.federationClient.Federation().Clusters().Watch(options) return s.federationClient.Federation().Clusters().Watch(options)
}, },
}, },
&federation.Cluster{}, &v1alpha1.Cluster{},
clusterSyncPeriod, clusterSyncPeriod,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
DeleteFunc: s.clusterCache.delFromClusterSet, DeleteFunc: s.clusterCache.delFromClusterSet,
AddFunc: s.clusterCache.addToClientMap, AddFunc: s.clusterCache.addToClientMap,
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*federation.Cluster) oldCluster, ok := old.(*v1alpha1.Cluster)
if !ok { if !ok {
return return
} }
curCluster, ok := cur.(*federation.Cluster) curCluster, ok := cur.(*v1alpha1.Cluster)
if !ok { if !ok {
return return
} }
@ -261,14 +261,14 @@ func (s *ServiceController) fedServiceWorker() {
} }
} }
func wantsDNSRecords(service *api.Service) bool { func wantsDNSRecords(service *v1.Service) bool {
return service.Spec.Type == api.ServiceTypeLoadBalancer return service.Spec.Type == v1.ServiceTypeLoadBalancer
} }
// processServiceForCluster creates or updates service to all registered running clusters, // processServiceForCluster creates or updates service to all registered running clusters,
// update DNS records and update the service info with DNS entries to federation apiserver. // update DNS records and update the service info with DNS entries to federation apiserver.
// the function returns any error caught // the function returns any error caught
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *release_1_3.Clientset) error {
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
// Create or Update k8s Service // Create or Update k8s Service
err := s.ensureClusterService(cachedService, clusterName, service, client) err := s.ensureClusterService(cachedService, clusterName, service, client)
@ -288,7 +288,7 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c
if err != nil { if err != nil {
return err, !retryable return err, !retryable
} }
service, ok := clone.(*api.Service) service, ok := clone.(*v1.Service)
if !ok { if !ok {
return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable
} }
@ -326,7 +326,7 @@ func (s *ServiceController) deleteFederationService(cachedService *cachedService
return nil, !retryable return nil, !retryable
} }
func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *clientset.Clientset) error { func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *release_1_3.Clientset) error {
service := cachedService.lastState service := cachedService.lastState
glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
var err error var err error
@ -342,7 +342,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi
return err return err
} }
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *release_1_3.Clientset) error {
var err error var err error
var needUpdate bool var needUpdate bool
for i := 0; i < clientRetryCount; i++ { for i := 0; i < clientRetryCount; i++ {
@ -434,7 +434,7 @@ func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
if !ok { if !ok {
service = &cachedService{ service = &cachedService{
endpointMap: make(map[string]int), endpointMap: make(map[string]int),
serviceStatusMap: make(map[string]api.LoadBalancerStatus), serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
} }
s.fedServiceMap[serviceName] = service s.fedServiceMap[serviceName] = service
} }
@ -454,12 +454,12 @@ func (s *serviceCache) delete(serviceName string) {
} }
// needsUpdateDNS check if the dns records of the given service should be updated // needsUpdateDNS check if the dns records of the given service should be updated
func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *api.Service) bool { func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool {
if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) { if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
return false return false
} }
if wantsDNSRecords(oldService) != wantsDNSRecords(newService) { if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v", s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
oldService.Spec.Type, newService.Spec.Type) oldService.Spec.Type, newService.Spec.Type)
return true return true
} }
@ -467,18 +467,18 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *
return true return true
} }
if !LoadBalancerIPsAreEqual(oldService, newService) { if !LoadBalancerIPsAreEqual(oldService, newService) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v", s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
return true return true
} }
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v", s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
return true return true
} }
for i := range oldService.Spec.ExternalIPs { for i := range oldService.Spec.ExternalIPs {
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v", s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
newService.Spec.ExternalIPs[i]) newService.Spec.ExternalIPs[i])
return true return true
} }
@ -487,7 +487,7 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *
return true return true
} }
if oldService.UID != newService.UID { if oldService.UID != newService.UID {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v", s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
oldService.UID, newService.UID) oldService.UID, newService.UID)
return true return true
} }
@ -495,11 +495,11 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *
return false return false
} }
func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
// TODO: quinton: Probably applies for DNS SVC records. Come back to this. // TODO: quinton: Probably applies for DNS SVC records. Come back to this.
//var protocol api.Protocol //var protocol api.Protocol
ports := []*api.ServicePort{} ports := []*v1.ServicePort{}
for i := range service.Spec.Ports { for i := range service.Spec.Ports {
sp := &service.Spec.Ports[i] sp := &service.Spec.Ports[i]
// The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation // The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation
@ -508,7 +508,7 @@ func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
return ports, nil return ports, nil
} }
func portsEqualForLB(x, y *api.Service) bool { func portsEqualForLB(x, y *v1.Service) bool {
xPorts, err := getPortsForLB(x) xPorts, err := getPortsForLB(x)
if err != nil { if err != nil {
return false return false
@ -520,7 +520,7 @@ func portsEqualForLB(x, y *api.Service) bool {
return portSlicesEqualForLB(xPorts, yPorts) return portSlicesEqualForLB(xPorts, yPorts)
} }
func portSlicesEqualForLB(x, y []*api.ServicePort) bool { func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
if len(x) != len(y) { if len(x) != len(y) {
return false return false
} }
@ -533,7 +533,7 @@ func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
return true return true
} }
func portEqualForLB(x, y *api.ServicePort) bool { func portEqualForLB(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it) // TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name { if x.Name != y.Name {
return false return false
@ -552,7 +552,7 @@ func portEqualForLB(x, y *api.ServicePort) bool {
return true return true
} }
func portEqualExcludeNodePort(x, y *api.ServicePort) bool { func portEqualExcludeNodePort(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it) // TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name { if x.Name != y.Name {
return false return false
@ -568,7 +568,7 @@ func portEqualExcludeNodePort(x, y *api.ServicePort) bool {
return true return true
} }
func clustersFromList(list *federation.ClusterList) []string { func clustersFromList(list *v1alpha1.ClusterList) []string {
result := []string{} result := []string{}
for ix := range list.Items { for ix := range list.Items {
result = append(result, list.Items[ix].Name) result = append(result, list.Items[ix].Name)
@ -579,7 +579,7 @@ func clustersFromList(list *federation.ClusterList) []string {
// getClusterConditionPredicate filter all clusters meet condition of // getClusterConditionPredicate filter all clusters meet condition of
// condition.type=Ready and condition.status=true // condition.type=Ready and condition.status=true
func getClusterConditionPredicate() federationcache.ClusterConditionPredicate { func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
return func(cluster federation.Cluster) bool { return func(cluster v1alpha1.Cluster) bool {
// If we have no info, don't accept // If we have no info, don't accept
if len(cluster.Status.Conditions) == 0 { if len(cluster.Status.Conditions) == 0 {
return false return false
@ -587,7 +587,7 @@ func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
for _, cond := range cluster.Status.Conditions { for _, cond := range cluster.Status.Conditions {
//We consider the cluster for load balancing only when its ClusterReady condition status //We consider the cluster for load balancing only when its ClusterReady condition status
//is ConditionTrue //is ConditionTrue
if cond.Type == federation.ClusterReady && cond.Status != api.ConditionTrue { if cond.Type == v1alpha1.ClusterReady && cond.Status != v1.ConditionTrue {
glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status) glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
return false return false
} }
@ -695,7 +695,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust
return nil return nil
} }
func LoadBalancerIPsAreEqual(oldService, newService *api.Service) bool { func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
} }
@ -778,7 +778,7 @@ func (s *ServiceController) syncService(key string) error {
} }
if exists { if exists {
service, ok := obj.(*api.Service) service, ok := obj.(*v1.Service)
if ok { if ok {
cachedService = s.serviceCache.getOrCreate(key) cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key) err, retryDelay = s.processServiceUpdate(cachedService, service, key)
@ -803,7 +803,7 @@ func (s *ServiceController) syncService(key string) error {
// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration // processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise // indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration. // we should retry in that Duration.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) { func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
// Ensure that no other goroutine will interfere with our processing of the // Ensure that no other goroutine will interfere with our processing of the
// service. // service.
cachedService.rwlock.Lock() cachedService.rwlock.Lock()
@ -822,7 +822,7 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
message += " (will not retry): " message += " (will not retry): "
} }
message += err.Error() message += err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "UpdateServiceFail", message) s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message)
return err, cachedService.nextRetryDelay() return err, cachedService.nextRetryDelay()
} }
// Always update the cache upon success. // Always update the cache upon success.
@ -849,7 +849,7 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
service := cachedService.lastState service := cachedService.lastState
cachedService.rwlock.Lock() cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock() defer cachedService.rwlock.Unlock()
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records") s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records")
// TODO should we delete dns info here or wait for endpoint changes? prefer here // TODO should we delete dns info here or wait for endpoint changes? prefer here
// or we do nothing for service deletion // or we do nothing for service deletion
//err := s.dns.balancer.EnsureLoadBalancerDeleted(service) //err := s.dns.balancer.EnsureLoadBalancerDeleted(service)
@ -861,10 +861,10 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
} else { } else {
message += " (will not retry): " message += " (will not retry): "
} }
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingDNSRecordFailed", message) s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingDNSRecordFailed", message)
return err, cachedService.nextRetryDelay() return err, cachedService.nextRetryDelay()
} }
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records") s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records")
s.serviceCache.delete(key) s.serviceCache.delete(key)
cachedService.resetRetryDelay() cachedService.resetRetryDelay()

View File

@ -20,9 +20,9 @@ import (
"sync" "sync"
"testing" "testing"
"k8s.io/kubernetes/federation/apis/federation" "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -39,22 +39,22 @@ func TestGetClusterConditionPredicate(t *testing.T) {
} }
tests := []struct { tests := []struct {
cluster federation.Cluster cluster v1alpha1.Cluster
expectAccept bool expectAccept bool
name string name string
serviceController *ServiceController serviceController *ServiceController
}{ }{
{ {
cluster: federation.Cluster{}, cluster: v1alpha1.Cluster{},
expectAccept: false, expectAccept: false,
name: "empty", name: "empty",
serviceController: &serviceController, serviceController: &serviceController,
}, },
{ {
cluster: federation.Cluster{ cluster: v1alpha1.Cluster{
Status: federation.ClusterStatus{ Status: v1alpha1.ClusterStatus{
Conditions: []federation.ClusterCondition{ Conditions: []v1alpha1.ClusterCondition{
{Type: federation.ClusterReady, Status: api.ConditionTrue}, {Type: v1alpha1.ClusterReady, Status: v1.ConditionTrue},
}, },
}, },
}, },
@ -63,10 +63,10 @@ func TestGetClusterConditionPredicate(t *testing.T) {
serviceController: &serviceController, serviceController: &serviceController,
}, },
{ {
cluster: federation.Cluster{ cluster: v1alpha1.Cluster{
Status: federation.ClusterStatus{ Status: v1alpha1.ClusterStatus{
Conditions: []federation.ClusterCondition{ Conditions: []v1alpha1.ClusterCondition{
{Type: federation.ClusterReady, Status: api.ConditionFalse}, {Type: v1alpha1.ClusterReady, Status: v1.ConditionFalse},
}, },
}, },
}, },

View File

@ -0,0 +1,116 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
utilnet "k8s.io/kubernetes/pkg/util/net"
"net"
"os"
)
const (
KubeAPIQPS = 20.0
KubeAPIBurst = 30
KubeconfigSecretDataKey = "kubeconfig"
)
func BuildClusterConfig(c *federation_v1alpha1.Cluster) (*restclient.Config, error) {
var serverAddress string
var clusterConfig *restclient.Config
hostIP, err := utilnet.ChooseHostInterface()
if err != nil {
return nil, err
}
for _, item := range c.Spec.ServerAddressByClientCIDRs {
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
if err != nil {
return nil, err
}
myaddr := net.ParseIP(hostIP.String())
if cidrnet.Contains(myaddr) == true {
serverAddress = item.ServerAddress
break
}
}
if serverAddress != "" {
if c.Spec.SecretRef == nil {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
clusterConfig, err = clientcmd.BuildConfigFromFlags(serverAddress, "")
} else {
kubeconfigGetter := KubeconfigGetterForCluster(c)
clusterConfig, err = clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter)
}
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
}
return clusterConfig, nil
}
// This is to inject a different kubeconfigGetter in tests.
// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens.
var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
secretRefName := ""
if c.Spec.SecretRef != nil {
secretRefName = c.Spec.SecretRef.Name
} else {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
}
return KubeconfigGetterForSecret(secretRefName)()
}
}
// KubeconfigGettterForSecret is used to get the kubeconfig from the given secret.
var KubeconfigGetterForSecret = func(secretName string) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
var data []byte
if secretName != "" {
// Get the namespace this is running in from the env variable.
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string")
}
// Get a client to talk to the k8s apiserver, to fetch secrets from it.
client, err := client.NewInCluster()
if err != nil {
return nil, fmt.Errorf("error in creating in-cluster client: %s", err)
}
data = []byte{}
secret, err := client.Secrets(namespace).Get(secretName)
if err != nil {
return nil, fmt.Errorf("error in fetching secret: %s", err)
}
ok := false
data, ok = secret.Data[KubeconfigSecretDataKey]
if !ok {
return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey)
}
}
return clientcmd.Load(data)
}
}