diff --git a/federation/client/cache/cluster_cache.go b/federation/client/cache/cluster_cache.go index ba22cc56247..4147254173d 100644 --- a/federation/client/cache/cluster_cache.go +++ b/federation/client/cache/cluster_cache.go @@ -18,7 +18,7 @@ package cache import ( "github.com/golang/glog" - "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/federation/apis/federation/v1alpha1" kubeCache "k8s.io/kubernetes/pkg/client/cache" ) @@ -28,16 +28,16 @@ type StoreToClusterLister struct { kubeCache.Store } -func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) { +func (s *StoreToClusterLister) List() (clusters v1alpha1.ClusterList, err error) { for _, m := range s.Store.List() { - clusters.Items = append(clusters.Items, *(m.(*federation.Cluster))) + clusters.Items = append(clusters.Items, *(m.(*v1alpha1.Cluster))) } return clusters, nil } // ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet // some set of criteria defined by the function. -type ClusterConditionPredicate func(cluster federation.Cluster) bool +type ClusterConditionPredicate func(cluster v1alpha1.Cluster) bool // storeToClusterConditionLister filters and returns nodes matching the given type and status from the store. type storeToClusterConditionLister struct { @@ -51,9 +51,9 @@ func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredic } // List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister. -func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) { +func (s storeToClusterConditionLister) List() (clusters v1alpha1.ClusterList, err error) { for _, m := range s.store.List() { - cluster := *m.(*federation.Cluster) + cluster := *m.(*v1alpha1.Cluster) if s.predicate(cluster) { clusters.Items = append(clusters.Items, cluster) } else { diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index b1227bd55f8..ee0b169949b 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -25,14 +25,13 @@ import ( "net/http/pprof" "strconv" - "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" - "k8s.io/kubernetes/pkg/client/restclient" - - internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/util/configz" @@ -77,7 +76,7 @@ func Run(s *options.CMServer) error { glog.Errorf("unable to register configz: %s", err) } // Create the config to talk to federation-apiserver. - kubeconfigGetter := clustercontroller.KubeconfigGetterForSecret(FederationAPIServerSecretName) + kubeconfigGetter := util.KubeconfigGetterForSecret(FederationAPIServerSecretName) restClientCfg, err := clientcmd.BuildConfigFromKubeconfigGetter(s.Master, kubeconfigGetter) if err != nil { return err @@ -115,14 +114,14 @@ func Run(s *options.CMServer) error { func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { - federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) - go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run() + ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) + go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run() dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile) if err != nil { glog.Fatalf("Cloud provider could not be initialized: %v", err) } - scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) - servicecontroller := servicecontroller.New(scclientset, dns) + scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) + servicecontroller := servicecontroller.New(scClientset, dns) if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { glog.Errorf("Failed to start service controller: %v", err) } diff --git a/federation/pkg/federation-controller/cluster/cluster_client.go b/federation/pkg/federation-controller/cluster/cluster_client.go index 8315c3de0fa..401d5e784a9 100644 --- a/federation/pkg/federation-controller/cluster/cluster_client.go +++ b/federation/pkg/federation-controller/cluster/cluster_client.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,22 +18,17 @@ package cluster import ( "fmt" - "net" - "os" "strings" "github.com/golang/glog" federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/typed/discovery" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" - clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" - utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" ) @@ -44,40 +39,13 @@ const ( KubeconfigSecretDataKey = "kubeconfig" ) -// This is to inject a different kubeconfigGetter in tests. -// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens. -var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter { - return func() (*clientcmdapi.Config, error) { - // Get the namespace this is running in from the env variable. - namespace := os.Getenv("POD_NAMESPACE") - if namespace == "" { - return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string") - } - // Get a client to talk to the k8s apiserver, to fetch secrets from it. - client, err := client.NewInCluster() - if err != nil { - return nil, fmt.Errorf("error in creating in-cluster client: %s", err) - } - secret, err := client.Secrets(namespace).Get(c.Spec.SecretRef.Name) - if err != nil { - return nil, fmt.Errorf("error in fetching secret: %s", err) - } - ok := false - data, ok := secret.Data[KubeconfigSecretDataKey] - if !ok { - return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey) - } - return clientcmd.Load(data) - } -} - type ClusterClient struct { discoveryClient *discovery.DiscoveryClient kubeClient *clientset.Clientset } func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) { - clusterConfig, err := BuildClusterConfig(c) + clusterConfig, err := util.BuildClusterConfig(c) if err != nil { return nil, err } @@ -95,42 +63,6 @@ func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) return &clusterClientSet, err } -func BuildClusterConfig(c *federation_v1alpha1.Cluster) (*restclient.Config, error) { - var serverAddress string - var clusterConfig *restclient.Config - hostIP, err := utilnet.ChooseHostInterface() - if err != nil { - return nil, err - } - - for _, item := range c.Spec.ServerAddressByClientCIDRs { - _, cidrnet, err := net.ParseCIDR(item.ClientCIDR) - if err != nil { - return nil, err - } - myaddr := net.ParseIP(hostIP.String()) - if cidrnet.Contains(myaddr) == true { - serverAddress = item.ServerAddress - break - } - } - if serverAddress != "" { - if c.Spec.SecretRef == nil { - glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name) - clusterConfig, err = clientcmd.BuildConfigFromFlags(serverAddress, "") - } else { - kubeconfigGetter := KubeconfigGetterForCluster(c) - clusterConfig, err = clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter) - } - if err != nil { - return nil, err - } - clusterConfig.QPS = KubeAPIQPS - clusterConfig.Burst = KubeAPIBurst - } - return clusterConfig, nil -} - // GetClusterHealthStatus gets the kubernetes cluster health status by requesting "/healthz" func (self *ClusterClient) GetClusterHealthStatus() *federation_v1alpha1.ClusterStatus { clusterStatus := federation_v1alpha1.ClusterStatus{} diff --git a/federation/pkg/federation-controller/cluster/clustercontroller.go b/federation/pkg/federation-controller/cluster/clustercontroller.go index 1d3d2d5b88b..e72c166815d 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import ( "time" "github.com/golang/glog" - "k8s.io/kubernetes/federation/apis/federation" federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" cluster_cache "k8s.io/kubernetes/federation/client/cache" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" @@ -73,7 +72,7 @@ func NewclusterController(federationClient federationclientset.Interface, cluste return cc.federationClient.Federation().Clusters().Watch(options) }, }, - &federation.Cluster{}, + &federation_v1alpha1.Cluster{}, controller.NoResyncPeriodFunc(), framework.ResourceEventHandlerFuncs{ DeleteFunc: cc.delFromClusterSet, diff --git a/federation/pkg/federation-controller/cluster/clustercontroller_test.go b/federation/pkg/federation-controller/cluster/clustercontroller_test.go index ca7c918a01a..29d96cd69dd 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller_test.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller_test.go @@ -25,6 +25,7 @@ import ( federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + controller_util "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" @@ -124,8 +125,8 @@ func TestUpdateClusterStatusOK(t *testing.T) { federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) // Override KubeconfigGetterForCluster to avoid having to setup service accounts and mount files with secret tokens. - originalGetter := KubeconfigGetterForCluster - KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter { + originalGetter := controller_util.KubeconfigGetterForCluster + controller_util.KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter { return func() (*clientcmdapi.Config, error) { return &clientcmdapi.Config{}, nil } @@ -146,5 +147,5 @@ func TestUpdateClusterStatusOK(t *testing.T) { } // Reset KubeconfigGetterForCluster - KubeconfigGetterForCluster = originalGetter + controller_util.KubeconfigGetterForCluster = originalGetter } diff --git a/federation/pkg/federation-controller/service/cluster_helper.go b/federation/pkg/federation-controller/service/cluster_helper.go index ac8093d651e..08c689f42fc 100644 --- a/federation/pkg/federation-controller/service/cluster_helper.go +++ b/federation/pkg/federation-controller/service/cluster_helper.go @@ -19,12 +19,12 @@ package service import ( "sync" - "k8s.io/kubernetes/federation/apis/federation" + v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" "k8s.io/kubernetes/pkg/api" + v1 "k8s.io/kubernetes/pkg/api/v1" cache "k8s.io/kubernetes/pkg/client/cache" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" "k8s.io/kubernetes/pkg/client/restclient" - "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/controller/framework" pkg_runtime "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -32,12 +32,13 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" "reflect" ) type clusterCache struct { - clientset *clientset.Clientset - cluster *federation.Cluster + clientset *release_1_3.Clientset + cluster *v1alpha1.Cluster // A store of services, populated by the serviceController serviceStore cache.StoreToServiceLister // Watches changes to all services @@ -57,7 +58,7 @@ type clusterClientCache struct { clientMap map[string]*clusterCache } -func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) { +func (cc *clusterClientCache) startClusterLW(cluster *v1alpha1.Cluster, clusterName string) { cachedClusterClient, ok := cc.clientMap[clusterName] // only create when no existing cachedClusterClient if ok { @@ -92,13 +93,13 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { - return clientset.Core().Endpoints(api.NamespaceAll).List(options) + return clientset.Core().Endpoints(v1.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return clientset.Core().Endpoints(api.NamespaceAll).Watch(options) + return clientset.Core().Endpoints(v1.NamespaceAll).Watch(options) }, }, - &api.Endpoints{}, + &v1.Endpoints{}, serviceSyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -116,25 +117,25 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { - return clientset.Core().Services(api.NamespaceAll).List(options) + return clientset.Core().Services(v1.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return clientset.Core().Services(api.NamespaceAll).Watch(options) + return clientset.Core().Services(v1.NamespaceAll).Watch(options) }, }, - &api.Service{}, + &v1.Service{}, serviceSyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { cc.enqueueService(obj, clusterName) }, UpdateFunc: func(old, cur interface{}) { - oldService, ok := old.(*api.Service) + oldService, ok := old.(*v1.Service) if !ok { return } - curService, ok := cur.(*api.Service) + curService, ok := cur.(*v1.Service) if !ok { return } @@ -143,7 +144,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste } }, DeleteFunc: func(obj interface{}) { - service, _ := obj.(*api.Service) + service, _ := obj.(*v1.Service) cc.enqueueService(obj, clusterName) glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName) }, @@ -161,7 +162,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste // delFromClusterSet delete a cluster from clusterSet and // delete the corresponding restclient from the map clusterKubeClientMap func (cc *clusterClientCache) delFromClusterSet(obj interface{}) { - cluster, ok := obj.(*federation.Cluster) + cluster, ok := obj.(*v1alpha1.Cluster) cc.rwlock.Lock() defer cc.rwlock.Unlock() if ok { @@ -180,10 +181,10 @@ func (cc *clusterClientCache) delFromClusterSet(obj interface{}) { // addToClusterSet inserts the new cluster to clusterSet and creates a corresponding // restclient to map clusterKubeClientMap func (cc *clusterClientCache) addToClientMap(obj interface{}) { - cluster := obj.(*federation.Cluster) + cluster := obj.(*v1alpha1.Cluster) cc.rwlock.Lock() defer cc.rwlock.Unlock() - cluster, ok := obj.(*federation.Cluster) + cluster, ok := obj.(*v1alpha1.Cluster) if !ok { return } @@ -195,13 +196,11 @@ func (cc *clusterClientCache) addToClientMap(obj interface{}) { } } -func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) { - clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "") - if err != nil { - return nil, err +func newClusterClientset(c *v1alpha1.Cluster) (*release_1_3.Clientset, error) { + clusterConfig, err := util.BuildClusterConfig(c) + if clusterConfig != nil { + clientset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName)) + return clientset, nil } - clusterConfig.QPS = KubeAPIQPS - clusterConfig.Burst = KubeAPIBurst - clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName)) - return clientset, nil + return nil, err } diff --git a/federation/pkg/federation-controller/service/dns.go b/federation/pkg/federation-controller/service/dns.go index c01bf4741d9..9f1574e9f2e 100644 --- a/federation/pkg/federation-controller/service/dns.go +++ b/federation/pkg/federation-controller/service/dns.go @@ -254,6 +254,9 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * // the state of the service when we last successfully sync'd it's DNS records. // So this time around we only need to patch that (add new records, remove deleted records, and update changed records. // + if s.dns == nil { + return nil + } if s == nil { return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService) } @@ -263,6 +266,9 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService * serviceName := cachedService.lastState.Name namespaceName := cachedService.lastState.Namespace zoneNames, regionName, err := s.getClusterZoneNames(clusterName) + if zoneNames == nil { + return fmt.Errorf("fail to get cluster zone names") + } if err != nil { return err } diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 6324e09620b..044f2d27623 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -20,8 +20,8 @@ import ( "fmt" "time" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" - "k8s.io/kubernetes/pkg/api" + federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + v1 "k8s.io/kubernetes/pkg/api/v1" cache "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/controller" @@ -54,7 +54,7 @@ func (sc *ServiceController) clusterEndpointWorker() { // Whenever there is change on endpoint, the federation service should be updated // key is the namespaced name of endpoint -func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface, serviceController *ServiceController) error { +func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, serviceController *ServiceController) error { cachedService, ok := serviceCache.get(key) if !ok { // here we filtered all non-federation services @@ -67,7 +67,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache return err } if exists { - endpoint, ok := endpointInterface.(*api.Endpoints) + endpoint, ok := endpointInterface.(*v1.Endpoints) if ok { glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController) @@ -117,7 +117,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi // Update dns info when endpoint update event received // We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0 -func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string, serviceController *ServiceController) error { +func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error { glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName) cachedService.rwlock.Lock() defer cachedService.rwlock.Unlock() diff --git a/federation/pkg/federation-controller/service/endpoint_helper_test.go b/federation/pkg/federation-controller/service/endpoint_helper_test.go index b2fe9c1d1dc..9815f19b5d3 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper_test.go +++ b/federation/pkg/federation-controller/service/endpoint_helper_test.go @@ -19,9 +19,9 @@ package service import ( "testing" - "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/federation/apis/federation/v1alpha1" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. - "k8s.io/kubernetes/pkg/api" + v1 "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/util/sets" ) @@ -38,14 +38,14 @@ var fakeServiceController = ServiceController{ knownClusterSet: make(sets.String), } -func buildEndpoint(subsets [][]string) *api.Endpoints { - endpoint := &api.Endpoints{ - Subsets: []api.EndpointSubset{ - {Addresses: []api.EndpointAddress{}}, +func buildEndpoint(subsets [][]string) *v1.Endpoints { + endpoint := &v1.Endpoints{ + Subsets: []v1.EndpointSubset{ + {Addresses: []v1.EndpointAddress{}}, }, } for _, element := range subsets { - address := api.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil} + address := v1.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil} endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address) } return endpoint @@ -58,14 +58,14 @@ func TestProcessEndpointUpdate(t *testing.T) { tests := []struct { name string cachedService *cachedService - endpoint *api.Endpoints + endpoint *v1.Endpoints clusterName string expectResult int }{ { "no-cache", &cachedService{ - lastState: &api.Service{}, + lastState: &v1.Service{}, endpointMap: make(map[string]int), }, buildEndpoint([][]string{{"ip1", ""}}), @@ -75,7 +75,7 @@ func TestProcessEndpointUpdate(t *testing.T) { { "has-cache", &cachedService{ - lastState: &api.Service{}, + lastState: &v1.Service{}, endpointMap: map[string]int{ "foo": 1, }, @@ -99,8 +99,8 @@ func TestProcessEndpointDeletion(t *testing.T) { cc := clusterClientCache{ clientMap: map[string]*clusterCache{ clusterName: { - cluster: &federation.Cluster{ - Status: federation.ClusterStatus{ + cluster: &v1alpha1.Cluster{ + Status: v1alpha1.ClusterStatus{ Zones: []string{"foozone"}, Region: "fooregion", }, @@ -111,14 +111,14 @@ func TestProcessEndpointDeletion(t *testing.T) { tests := []struct { name string cachedService *cachedService - endpoint *api.Endpoints + endpoint *v1.Endpoints clusterName string expectResult int }{ { "no-cache", &cachedService{ - lastState: &api.Service{}, + lastState: &v1.Service{}, endpointMap: make(map[string]int), }, buildEndpoint([][]string{{"ip1", ""}}), @@ -128,7 +128,7 @@ func TestProcessEndpointDeletion(t *testing.T) { { "has-cache", &cachedService{ - lastState: &api.Service{}, + lastState: &v1.Service{}, endpointMap: map[string]int{ clusterName: 1, }, diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 5a2c4fca363..40ee628f6f5 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -20,9 +20,9 @@ import ( "fmt" "time" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" - "k8s.io/kubernetes/pkg/api" + federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" "k8s.io/kubernetes/pkg/api/errors" + v1 "k8s.io/kubernetes/pkg/api/v1" cache "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/controller" @@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() { } // Whenever there is change on service, the federation service should be updated -func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error { +func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error { // obj holds the latest service info from apiserver, return if there is no federation cache for the service cachedService, ok := serviceCache.get(key) if !ok { @@ -70,7 +70,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache } var needUpdate bool if exists { - service, ok := serviceInterface.(*api.Service) + service, ok := serviceInterface.(*v1.Service) if ok { glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) @@ -140,7 +140,7 @@ func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedServic // processServiceUpdate Update ingress info when service updated // the function returns a bool to indicate if actual update happend on federation service cache // and if the federation service cache is updated, the updated info should be post to federation apiserver -func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *api.Service, clusterName string) bool { +func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *v1.Service, clusterName string) bool { glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) cachedService.rwlock.Lock() defer cachedService.rwlock.Unlock() @@ -214,7 +214,7 @@ func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, return needUpdate } -func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federationclientset.Interface) error { +func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federation_release_1_3.Interface) error { service := cachedService.lastState glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name) var err error diff --git a/federation/pkg/federation-controller/service/service_helper_test.go b/federation/pkg/federation-controller/service/service_helper_test.go index 09767d3f475..50d5b742ad1 100644 --- a/federation/pkg/federation-controller/service/service_helper_test.go +++ b/federation/pkg/federation-controller/service/service_helper_test.go @@ -20,15 +20,15 @@ import ( "reflect" "testing" - "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" ) -func buildServiceStatus(ingresses [][]string) api.LoadBalancerStatus { - status := api.LoadBalancerStatus{ - Ingress: []api.LoadBalancerIngress{}, +func buildServiceStatus(ingresses [][]string) v1.LoadBalancerStatus { + status := v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{}, } for _, element := range ingresses { - ingress := api.LoadBalancerIngress{IP: element[0], Hostname: element[1]} + ingress := v1.LoadBalancerIngress{IP: element[0], Hostname: element[1]} status.Ingress = append(status.Ingress, ingress) } return status @@ -41,18 +41,18 @@ func TestProcessServiceUpdate(t *testing.T) { tests := []struct { name string cachedService *cachedService - service *api.Service + service *v1.Service clusterName string expectNeedUpdate bool - expectStatus api.LoadBalancerStatus + expectStatus v1.LoadBalancerStatus }{ { "no-cache", &cachedService{ - lastState: &api.Service{}, - serviceStatusMap: make(map[string]api.LoadBalancerStatus), + lastState: &v1.Service{}, + serviceStatusMap: make(map[string]v1.LoadBalancerStatus), }, - &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, "foo", true, buildServiceStatus([][]string{{"ip1", ""}}), @@ -60,12 +60,12 @@ func TestProcessServiceUpdate(t *testing.T) { { "same-ingress", &cachedService{ - lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - serviceStatusMap: map[string]api.LoadBalancerStatus{ - "foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, + lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + serviceStatusMap: map[string]v1.LoadBalancerStatus{ + "foo1": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}}, }, }, - &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, "foo1", false, buildServiceStatus([][]string{{"ip1", ""}}), @@ -73,14 +73,14 @@ func TestProcessServiceUpdate(t *testing.T) { { "diff-cluster", &cachedService{ - lastState: &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "bar1"}, + lastState: &v1.Service{ + ObjectMeta: v1.ObjectMeta{Name: "bar1"}, }, - serviceStatusMap: map[string]api.LoadBalancerStatus{ - "foo2": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, + serviceStatusMap: map[string]v1.LoadBalancerStatus{ + "foo2": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}}, }, }, - &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, "foo1", true, buildServiceStatus([][]string{{"ip1", ""}}), @@ -88,12 +88,12 @@ func TestProcessServiceUpdate(t *testing.T) { { "diff-ingress", &cachedService{ - lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}}, - serviceStatusMap: map[string]api.LoadBalancerStatus{ + lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}}, + serviceStatusMap: map[string]v1.LoadBalancerStatus{ "foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}), }, }, - &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}}, + &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}}, "foo1", true, buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}), @@ -117,20 +117,20 @@ func TestProcessServiceDeletion(t *testing.T) { tests := []struct { name string cachedService *cachedService - service *api.Service + service *v1.Service clusterName string expectNeedUpdate bool - expectStatus api.LoadBalancerStatus + expectStatus v1.LoadBalancerStatus }{ { "same-ingress", &cachedService{ - lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - serviceStatusMap: map[string]api.LoadBalancerStatus{ - "foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, + lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + serviceStatusMap: map[string]v1.LoadBalancerStatus{ + "foo1": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}}, }, }, - &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, "foo1", true, buildServiceStatus([][]string{}), @@ -138,13 +138,13 @@ func TestProcessServiceDeletion(t *testing.T) { { "diff-ingress", &cachedService{ - lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}}, - serviceStatusMap: map[string]api.LoadBalancerStatus{ + lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}}, + serviceStatusMap: map[string]v1.LoadBalancerStatus{ "foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}), "foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), }, }, - &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}}, + &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}}, "foo1", true, buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 79f0981dbb6..1b44e2a7e47 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -24,14 +24,15 @@ import ( "reflect" "github.com/golang/glog" - federation "k8s.io/kubernetes/federation/apis/federation" + v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" federationcache "k8s.io/kubernetes/federation/client/cache" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" + federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + v1 "k8s.io/kubernetes/pkg/api/v1" cache "k8s.io/kubernetes/pkg/client/cache" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" @@ -46,9 +47,8 @@ import ( ) const ( - // TODO update to 10 mins before merge - serviceSyncPeriod = 30 * time.Second - clusterSyncPeriod = 100 * time.Second + serviceSyncPeriod = 10 * time.Minute + clusterSyncPeriod = 10 * time.Minute // How long to wait before retrying the processing of a service change. // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster @@ -71,15 +71,15 @@ const ( ) type cachedService struct { - lastState *api.Service + lastState *v1.Service // The state as successfully applied to the DNS server - appliedState *api.Service + appliedState *v1.Service // cluster endpoint map hold subset info from kubernetes clusters // key clusterName // value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address endpointMap map[string]int // serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName - serviceStatusMap map[string]api.LoadBalancerStatus + serviceStatusMap map[string]v1.LoadBalancerStatus // Ensures only one goroutine can operate on this service at any given time. rwlock sync.Mutex // Controls error back-off for procceeding federation service to k8s clusters @@ -99,7 +99,7 @@ type serviceCache struct { type ServiceController struct { dns dnsprovider.Interface - federationClient federationclientset.Interface + federationClient federation_release_1_3.Interface federationName string // each federation should be configured with a single zone (e.g. "mycompany.com") dnsZones dnsprovider.Zones @@ -123,7 +123,7 @@ type ServiceController struct { // New returns a new service controller to keep DNS provider service resources // (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. -func New(federationClient federationclientset.Interface, dns dnsprovider.Interface) *ServiceController { +func New(federationClient federation_release_1_3.Interface, dns dnsprovider.Interface) *ServiceController { broadcaster := record.NewBroadcaster() // federationClient event is not supported yet // broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) @@ -145,13 +145,13 @@ func New(federationClient federationclientset.Interface, dns dnsprovider.Interfa s.serviceStore.Store, s.serviceController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { - return s.federationClient.Core().Services(api.NamespaceAll).List(options) + return s.federationClient.Core().Services(v1.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return s.federationClient.Core().Services(api.NamespaceAll).Watch(options) + return s.federationClient.Core().Services(v1.NamespaceAll).Watch(options) }, }, - &api.Service{}, + &v1.Service{}, serviceSyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: s.enqueueService, @@ -173,17 +173,17 @@ func New(federationClient federationclientset.Interface, dns dnsprovider.Interfa return s.federationClient.Federation().Clusters().Watch(options) }, }, - &federation.Cluster{}, + &v1alpha1.Cluster{}, clusterSyncPeriod, framework.ResourceEventHandlerFuncs{ DeleteFunc: s.clusterCache.delFromClusterSet, AddFunc: s.clusterCache.addToClientMap, UpdateFunc: func(old, cur interface{}) { - oldCluster, ok := old.(*federation.Cluster) + oldCluster, ok := old.(*v1alpha1.Cluster) if !ok { return } - curCluster, ok := cur.(*federation.Cluster) + curCluster, ok := cur.(*v1alpha1.Cluster) if !ok { return } @@ -261,14 +261,14 @@ func (s *ServiceController) fedServiceWorker() { } } -func wantsDNSRecords(service *api.Service) bool { - return service.Spec.Type == api.ServiceTypeLoadBalancer +func wantsDNSRecords(service *v1.Service) bool { + return service.Spec.Type == v1.ServiceTypeLoadBalancer } // processServiceForCluster creates or updates service to all registered running clusters, // update DNS records and update the service info with DNS entries to federation apiserver. // the function returns any error caught -func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { +func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *release_1_3.Clientset) error { glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) // Create or Update k8s Service err := s.ensureClusterService(cachedService, clusterName, service, client) @@ -288,7 +288,7 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c if err != nil { return err, !retryable } - service, ok := clone.(*api.Service) + service, ok := clone.(*v1.Service) if !ok { return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable } @@ -326,7 +326,7 @@ func (s *ServiceController) deleteFederationService(cachedService *cachedService return nil, !retryable } -func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *clientset.Clientset) error { +func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *release_1_3.Clientset) error { service := cachedService.lastState glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) var err error @@ -342,7 +342,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi return err } -func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { +func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *release_1_3.Clientset) error { var err error var needUpdate bool for i := 0; i < clientRetryCount; i++ { @@ -434,7 +434,7 @@ func (s *serviceCache) getOrCreate(serviceName string) *cachedService { if !ok { service = &cachedService{ endpointMap: make(map[string]int), - serviceStatusMap: make(map[string]api.LoadBalancerStatus), + serviceStatusMap: make(map[string]v1.LoadBalancerStatus), } s.fedServiceMap[serviceName] = service } @@ -454,12 +454,12 @@ func (s *serviceCache) delete(serviceName string) { } // needsUpdateDNS check if the dns records of the given service should be updated -func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *api.Service) bool { +func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool { if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) { return false } if wantsDNSRecords(oldService) != wantsDNSRecords(newService) { - s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v", + s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v", oldService.Spec.Type, newService.Spec.Type) return true } @@ -467,18 +467,18 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService * return true } if !LoadBalancerIPsAreEqual(oldService, newService) { - s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v", + s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v", oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) return true } if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { - s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v", + s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v", len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) return true } for i := range oldService.Spec.ExternalIPs { if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { - s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v", + s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v", newService.Spec.ExternalIPs[i]) return true } @@ -487,7 +487,7 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService * return true } if oldService.UID != newService.UID { - s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v", + s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v", oldService.UID, newService.UID) return true } @@ -495,11 +495,11 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService * return false } -func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { +func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) { // TODO: quinton: Probably applies for DNS SVC records. Come back to this. //var protocol api.Protocol - ports := []*api.ServicePort{} + ports := []*v1.ServicePort{} for i := range service.Spec.Ports { sp := &service.Spec.Ports[i] // The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation @@ -508,7 +508,7 @@ func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { return ports, nil } -func portsEqualForLB(x, y *api.Service) bool { +func portsEqualForLB(x, y *v1.Service) bool { xPorts, err := getPortsForLB(x) if err != nil { return false @@ -520,7 +520,7 @@ func portsEqualForLB(x, y *api.Service) bool { return portSlicesEqualForLB(xPorts, yPorts) } -func portSlicesEqualForLB(x, y []*api.ServicePort) bool { +func portSlicesEqualForLB(x, y []*v1.ServicePort) bool { if len(x) != len(y) { return false } @@ -533,7 +533,7 @@ func portSlicesEqualForLB(x, y []*api.ServicePort) bool { return true } -func portEqualForLB(x, y *api.ServicePort) bool { +func portEqualForLB(x, y *v1.ServicePort) bool { // TODO: Should we check name? (In theory, an LB could expose it) if x.Name != y.Name { return false @@ -552,7 +552,7 @@ func portEqualForLB(x, y *api.ServicePort) bool { return true } -func portEqualExcludeNodePort(x, y *api.ServicePort) bool { +func portEqualExcludeNodePort(x, y *v1.ServicePort) bool { // TODO: Should we check name? (In theory, an LB could expose it) if x.Name != y.Name { return false @@ -568,7 +568,7 @@ func portEqualExcludeNodePort(x, y *api.ServicePort) bool { return true } -func clustersFromList(list *federation.ClusterList) []string { +func clustersFromList(list *v1alpha1.ClusterList) []string { result := []string{} for ix := range list.Items { result = append(result, list.Items[ix].Name) @@ -579,7 +579,7 @@ func clustersFromList(list *federation.ClusterList) []string { // getClusterConditionPredicate filter all clusters meet condition of // condition.type=Ready and condition.status=true func getClusterConditionPredicate() federationcache.ClusterConditionPredicate { - return func(cluster federation.Cluster) bool { + return func(cluster v1alpha1.Cluster) bool { // If we have no info, don't accept if len(cluster.Status.Conditions) == 0 { return false @@ -587,7 +587,7 @@ func getClusterConditionPredicate() federationcache.ClusterConditionPredicate { for _, cond := range cluster.Status.Conditions { //We consider the cluster for load balancing only when its ClusterReady condition status //is ConditionTrue - if cond.Type == federation.ClusterReady && cond.Status != api.ConditionTrue { + if cond.Type == v1alpha1.ClusterReady && cond.Status != v1.ConditionTrue { glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status) return false } @@ -695,7 +695,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust return nil } -func LoadBalancerIPsAreEqual(oldService, newService *api.Service) bool { +func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool { return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP } @@ -778,7 +778,7 @@ func (s *ServiceController) syncService(key string) error { } if exists { - service, ok := obj.(*api.Service) + service, ok := obj.(*v1.Service) if ok { cachedService = s.serviceCache.getOrCreate(key) err, retryDelay = s.processServiceUpdate(cachedService, service, key) @@ -803,7 +803,7 @@ func (s *ServiceController) syncService(key string) error { // processServiceUpdate returns an error if processing the service update failed, along with a time.Duration // indicating whether processing should be retried; zero means no-retry; otherwise // we should retry in that Duration. -func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) { +func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) { // Ensure that no other goroutine will interfere with our processing of the // service. cachedService.rwlock.Lock() @@ -822,7 +822,7 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s message += " (will not retry): " } message += err.Error() - s.eventRecorder.Event(service, api.EventTypeWarning, "UpdateServiceFail", message) + s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message) return err, cachedService.nextRetryDelay() } // Always update the cache upon success. @@ -849,7 +849,7 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura service := cachedService.lastState cachedService.rwlock.Lock() defer cachedService.rwlock.Unlock() - s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records") + s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records") // TODO should we delete dns info here or wait for endpoint changes? prefer here // or we do nothing for service deletion //err := s.dns.balancer.EnsureLoadBalancerDeleted(service) @@ -861,10 +861,10 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura } else { message += " (will not retry): " } - s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingDNSRecordFailed", message) + s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingDNSRecordFailed", message) return err, cachedService.nextRetryDelay() } - s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records") + s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records") s.serviceCache.delete(key) cachedService.resetRetryDelay() diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index 0eec3c9565c..31f9a0235a3 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -20,9 +20,9 @@ import ( "sync" "testing" - "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/federation/apis/federation/v1alpha1" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. - "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/util/sets" ) @@ -39,22 +39,22 @@ func TestGetClusterConditionPredicate(t *testing.T) { } tests := []struct { - cluster federation.Cluster + cluster v1alpha1.Cluster expectAccept bool name string serviceController *ServiceController }{ { - cluster: federation.Cluster{}, + cluster: v1alpha1.Cluster{}, expectAccept: false, name: "empty", serviceController: &serviceController, }, { - cluster: federation.Cluster{ - Status: federation.ClusterStatus{ - Conditions: []federation.ClusterCondition{ - {Type: federation.ClusterReady, Status: api.ConditionTrue}, + cluster: v1alpha1.Cluster{ + Status: v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + {Type: v1alpha1.ClusterReady, Status: v1.ConditionTrue}, }, }, }, @@ -63,10 +63,10 @@ func TestGetClusterConditionPredicate(t *testing.T) { serviceController: &serviceController, }, { - cluster: federation.Cluster{ - Status: federation.ClusterStatus{ - Conditions: []federation.ClusterCondition{ - {Type: federation.ClusterReady, Status: api.ConditionFalse}, + cluster: v1alpha1.Cluster{ + Status: v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + {Type: v1alpha1.ClusterReady, Status: v1.ConditionFalse}, }, }, }, diff --git a/federation/pkg/federation-controller/util/cluster_util.go b/federation/pkg/federation-controller/util/cluster_util.go new file mode 100644 index 00000000000..a92c3a829be --- /dev/null +++ b/federation/pkg/federation-controller/util/cluster_util.go @@ -0,0 +1,116 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "github.com/golang/glog" + federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + "k8s.io/kubernetes/pkg/client/restclient" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" + utilnet "k8s.io/kubernetes/pkg/util/net" + "net" + "os" +) + +const ( + KubeAPIQPS = 20.0 + KubeAPIBurst = 30 + KubeconfigSecretDataKey = "kubeconfig" +) + +func BuildClusterConfig(c *federation_v1alpha1.Cluster) (*restclient.Config, error) { + var serverAddress string + var clusterConfig *restclient.Config + hostIP, err := utilnet.ChooseHostInterface() + if err != nil { + return nil, err + } + + for _, item := range c.Spec.ServerAddressByClientCIDRs { + _, cidrnet, err := net.ParseCIDR(item.ClientCIDR) + if err != nil { + return nil, err + } + myaddr := net.ParseIP(hostIP.String()) + if cidrnet.Contains(myaddr) == true { + serverAddress = item.ServerAddress + break + } + } + if serverAddress != "" { + if c.Spec.SecretRef == nil { + glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name) + clusterConfig, err = clientcmd.BuildConfigFromFlags(serverAddress, "") + } else { + kubeconfigGetter := KubeconfigGetterForCluster(c) + clusterConfig, err = clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter) + } + if err != nil { + return nil, err + } + clusterConfig.QPS = KubeAPIQPS + clusterConfig.Burst = KubeAPIBurst + } + return clusterConfig, nil +} + +// This is to inject a different kubeconfigGetter in tests. +// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens. +var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter { + return func() (*clientcmdapi.Config, error) { + secretRefName := "" + if c.Spec.SecretRef != nil { + secretRefName = c.Spec.SecretRef.Name + } else { + glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name) + } + return KubeconfigGetterForSecret(secretRefName)() + } +} + +// KubeconfigGettterForSecret is used to get the kubeconfig from the given secret. +var KubeconfigGetterForSecret = func(secretName string) clientcmd.KubeconfigGetter { + return func() (*clientcmdapi.Config, error) { + var data []byte + if secretName != "" { + // Get the namespace this is running in from the env variable. + namespace := os.Getenv("POD_NAMESPACE") + if namespace == "" { + return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string") + } + // Get a client to talk to the k8s apiserver, to fetch secrets from it. + client, err := client.NewInCluster() + if err != nil { + return nil, fmt.Errorf("error in creating in-cluster client: %s", err) + } + data = []byte{} + secret, err := client.Secrets(namespace).Get(secretName) + if err != nil { + return nil, fmt.Errorf("error in fetching secret: %s", err) + } + ok := false + data, ok = secret.Data[KubeconfigSecretDataKey] + if !ok { + return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey) + } + } + return clientcmd.Load(data) + } +}