change clientset of service controller to versioned one

This commit is contained in:
mfanjie 2016-06-03 13:21:35 +08:00
parent c2dbce68f1
commit 6dde087f69
14 changed files with 285 additions and 233 deletions

View File

@ -18,7 +18,7 @@ package cache
import (
"github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/federation/apis/federation/v1alpha1"
kubeCache "k8s.io/kubernetes/pkg/client/cache"
)
@ -28,16 +28,16 @@ type StoreToClusterLister struct {
kubeCache.Store
}
func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) {
func (s *StoreToClusterLister) List() (clusters v1alpha1.ClusterList, err error) {
for _, m := range s.Store.List() {
clusters.Items = append(clusters.Items, *(m.(*federation.Cluster)))
clusters.Items = append(clusters.Items, *(m.(*v1alpha1.Cluster)))
}
return clusters, nil
}
// ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet
// some set of criteria defined by the function.
type ClusterConditionPredicate func(cluster federation.Cluster) bool
type ClusterConditionPredicate func(cluster v1alpha1.Cluster) bool
// storeToClusterConditionLister filters and returns nodes matching the given type and status from the store.
type storeToClusterConditionLister struct {
@ -51,9 +51,9 @@ func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredic
}
// List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister.
func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) {
func (s storeToClusterConditionLister) List() (clusters v1alpha1.ClusterList, err error) {
for _, m := range s.store.List() {
cluster := *m.(*federation.Cluster)
cluster := *m.(*v1alpha1.Cluster)
if s.predicate(cluster) {
clusters.Items = append(clusters.Items, cluster)
} else {

View File

@ -25,14 +25,13 @@ import (
"net/http/pprof"
"strconv"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/pkg/client/restclient"
internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/configz"
@ -77,7 +76,7 @@ func Run(s *options.CMServer) error {
glog.Errorf("unable to register configz: %s", err)
}
// Create the config to talk to federation-apiserver.
kubeconfigGetter := clustercontroller.KubeconfigGetterForSecret(FederationAPIServerSecretName)
kubeconfigGetter := util.KubeconfigGetterForSecret(FederationAPIServerSecretName)
restClientCfg, err := clientcmd.BuildConfigFromKubeconfigGetter(s.Master, kubeconfigGetter)
if err != nil {
return err
@ -115,14 +114,14 @@ func Run(s *options.CMServer) error {
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run()
ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run()
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scclientset, dns)
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scClientset, dns)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,22 +18,17 @@ package cluster
import (
"fmt"
"net"
"os"
"strings"
"github.com/golang/glog"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -44,40 +39,13 @@ const (
KubeconfigSecretDataKey = "kubeconfig"
)
// This is to inject a different kubeconfigGetter in tests.
// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens.
var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
// Get the namespace this is running in from the env variable.
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string")
}
// Get a client to talk to the k8s apiserver, to fetch secrets from it.
client, err := client.NewInCluster()
if err != nil {
return nil, fmt.Errorf("error in creating in-cluster client: %s", err)
}
secret, err := client.Secrets(namespace).Get(c.Spec.SecretRef.Name)
if err != nil {
return nil, fmt.Errorf("error in fetching secret: %s", err)
}
ok := false
data, ok := secret.Data[KubeconfigSecretDataKey]
if !ok {
return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey)
}
return clientcmd.Load(data)
}
}
type ClusterClient struct {
discoveryClient *discovery.DiscoveryClient
kubeClient *clientset.Clientset
}
func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) {
clusterConfig, err := BuildClusterConfig(c)
clusterConfig, err := util.BuildClusterConfig(c)
if err != nil {
return nil, err
}
@ -95,42 +63,6 @@ func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error)
return &clusterClientSet, err
}
func BuildClusterConfig(c *federation_v1alpha1.Cluster) (*restclient.Config, error) {
var serverAddress string
var clusterConfig *restclient.Config
hostIP, err := utilnet.ChooseHostInterface()
if err != nil {
return nil, err
}
for _, item := range c.Spec.ServerAddressByClientCIDRs {
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
if err != nil {
return nil, err
}
myaddr := net.ParseIP(hostIP.String())
if cidrnet.Contains(myaddr) == true {
serverAddress = item.ServerAddress
break
}
}
if serverAddress != "" {
if c.Spec.SecretRef == nil {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
clusterConfig, err = clientcmd.BuildConfigFromFlags(serverAddress, "")
} else {
kubeconfigGetter := KubeconfigGetterForCluster(c)
clusterConfig, err = clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter)
}
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
}
return clusterConfig, nil
}
// GetClusterHealthStatus gets the kubernetes cluster health status by requesting "/healthz"
func (self *ClusterClient) GetClusterHealthStatus() *federation_v1alpha1.ClusterStatus {
clusterStatus := federation_v1alpha1.ClusterStatus{}

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -21,7 +21,6 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
cluster_cache "k8s.io/kubernetes/federation/client/cache"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
@ -73,7 +72,7 @@ func NewclusterController(federationClient federationclientset.Interface, cluste
return cc.federationClient.Federation().Clusters().Watch(options)
},
},
&federation.Cluster{},
&federation_v1alpha1.Cluster{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: cc.delFromClusterSet,

View File

@ -25,6 +25,7 @@ import (
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
controller_util "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
@ -124,8 +125,8 @@ func TestUpdateClusterStatusOK(t *testing.T) {
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
// Override KubeconfigGetterForCluster to avoid having to setup service accounts and mount files with secret tokens.
originalGetter := KubeconfigGetterForCluster
KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
originalGetter := controller_util.KubeconfigGetterForCluster
controller_util.KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
return &clientcmdapi.Config{}, nil
}
@ -146,5 +147,5 @@ func TestUpdateClusterStatusOK(t *testing.T) {
}
// Reset KubeconfigGetterForCluster
KubeconfigGetterForCluster = originalGetter
controller_util.KubeconfigGetterForCluster = originalGetter
}

View File

@ -19,12 +19,12 @@ package service
import (
"sync"
"k8s.io/kubernetes/federation/apis/federation"
v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/api"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -32,12 +32,13 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"reflect"
)
type clusterCache struct {
clientset *clientset.Clientset
cluster *federation.Cluster
clientset *release_1_3.Clientset
cluster *v1alpha1.Cluster
// A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister
// Watches changes to all services
@ -57,7 +58,7 @@ type clusterClientCache struct {
clientMap map[string]*clusterCache
}
func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) {
func (cc *clusterClientCache) startClusterLW(cluster *v1alpha1.Cluster, clusterName string) {
cachedClusterClient, ok := cc.clientMap[clusterName]
// only create when no existing cachedClusterClient
if ok {
@ -92,13 +93,13 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Endpoints(api.NamespaceAll).List(options)
return clientset.Core().Endpoints(v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Endpoints(api.NamespaceAll).Watch(options)
return clientset.Core().Endpoints(v1.NamespaceAll).Watch(options)
},
},
&api.Endpoints{},
&v1.Endpoints{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -116,25 +117,25 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Services(api.NamespaceAll).List(options)
return clientset.Core().Services(v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Services(api.NamespaceAll).Watch(options)
return clientset.Core().Services(v1.NamespaceAll).Watch(options)
},
},
&api.Service{},
&v1.Service{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueService(obj, clusterName)
},
UpdateFunc: func(old, cur interface{}) {
oldService, ok := old.(*api.Service)
oldService, ok := old.(*v1.Service)
if !ok {
return
}
curService, ok := cur.(*api.Service)
curService, ok := cur.(*v1.Service)
if !ok {
return
}
@ -143,7 +144,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
}
},
DeleteFunc: func(obj interface{}) {
service, _ := obj.(*api.Service)
service, _ := obj.(*v1.Service)
cc.enqueueService(obj, clusterName)
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
},
@ -161,7 +162,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
// delFromClusterSet delete a cluster from clusterSet and
// delete the corresponding restclient from the map clusterKubeClientMap
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
cluster, ok := obj.(*federation.Cluster)
cluster, ok := obj.(*v1alpha1.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
if ok {
@ -180,10 +181,10 @@ func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
// restclient to map clusterKubeClientMap
func (cc *clusterClientCache) addToClientMap(obj interface{}) {
cluster := obj.(*federation.Cluster)
cluster := obj.(*v1alpha1.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
cluster, ok := obj.(*federation.Cluster)
cluster, ok := obj.(*v1alpha1.Cluster)
if !ok {
return
}
@ -195,13 +196,11 @@ func (cc *clusterClientCache) addToClientMap(obj interface{}) {
}
}
func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) {
clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "")
if err != nil {
return nil, err
func newClusterClientset(c *v1alpha1.Cluster) (*release_1_3.Clientset, error) {
clusterConfig, err := util.BuildClusterConfig(c)
if clusterConfig != nil {
clientset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
return nil, err
}

View File

@ -254,6 +254,9 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// the state of the service when we last successfully sync'd it's DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records.
//
if s.dns == nil {
return nil
}
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
@ -263,6 +266,9 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
serviceName := cachedService.lastState.Name
namespaceName := cachedService.lastState.Namespace
zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
if zoneNames == nil {
return fmt.Errorf("fail to get cluster zone names")
}
if err != nil {
return err
}

View File

@ -20,8 +20,8 @@ import (
"fmt"
"time"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
"k8s.io/kubernetes/pkg/api"
federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller"
@ -54,7 +54,7 @@ func (sc *ServiceController) clusterEndpointWorker() {
// Whenever there is change on endpoint, the federation service should be updated
// key is the namespaced name of endpoint
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface, serviceController *ServiceController) error {
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, serviceController *ServiceController) error {
cachedService, ok := serviceCache.get(key)
if !ok {
// here we filtered all non-federation services
@ -67,7 +67,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
return err
}
if exists {
endpoint, ok := endpointInterface.(*api.Endpoints)
endpoint, ok := endpointInterface.(*v1.Endpoints)
if ok {
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
@ -117,7 +117,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// Update dns info when endpoint update event received
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string, serviceController *ServiceController) error {
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()

View File

@ -19,9 +19,9 @@ package service
import (
"testing"
"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
"k8s.io/kubernetes/pkg/api"
v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -38,14 +38,14 @@ var fakeServiceController = ServiceController{
knownClusterSet: make(sets.String),
}
func buildEndpoint(subsets [][]string) *api.Endpoints {
endpoint := &api.Endpoints{
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{}},
func buildEndpoint(subsets [][]string) *v1.Endpoints {
endpoint := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{}},
},
}
for _, element := range subsets {
address := api.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil}
address := v1.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil}
endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address)
}
return endpoint
@ -58,14 +58,14 @@ func TestProcessEndpointUpdate(t *testing.T) {
tests := []struct {
name string
cachedService *cachedService
endpoint *api.Endpoints
endpoint *v1.Endpoints
clusterName string
expectResult int
}{
{
"no-cache",
&cachedService{
lastState: &api.Service{},
lastState: &v1.Service{},
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
@ -75,7 +75,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
{
"has-cache",
&cachedService{
lastState: &api.Service{},
lastState: &v1.Service{},
endpointMap: map[string]int{
"foo": 1,
},
@ -99,8 +99,8 @@ func TestProcessEndpointDeletion(t *testing.T) {
cc := clusterClientCache{
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &federation.Cluster{
Status: federation.ClusterStatus{
cluster: &v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
@ -111,14 +111,14 @@ func TestProcessEndpointDeletion(t *testing.T) {
tests := []struct {
name string
cachedService *cachedService
endpoint *api.Endpoints
endpoint *v1.Endpoints
clusterName string
expectResult int
}{
{
"no-cache",
&cachedService{
lastState: &api.Service{},
lastState: &v1.Service{},
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
@ -128,7 +128,7 @@ func TestProcessEndpointDeletion(t *testing.T) {
{
"has-cache",
&cachedService{
lastState: &api.Service{},
lastState: &v1.Service{},
endpointMap: map[string]int{
clusterName: 1,
},

View File

@ -20,9 +20,9 @@ import (
"fmt"
"time"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
"k8s.io/kubernetes/pkg/api"
federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/pkg/api/errors"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller"
@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() {
}
// Whenever there is change on service, the federation service should be updated
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error {
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error {
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
cachedService, ok := serviceCache.get(key)
if !ok {
@ -70,7 +70,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
}
var needUpdate bool
if exists {
service, ok := serviceInterface.(*api.Service)
service, ok := serviceInterface.(*v1.Service)
if ok {
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
@ -140,7 +140,7 @@ func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedServic
// processServiceUpdate Update ingress info when service updated
// the function returns a bool to indicate if actual update happend on federation service cache
// and if the federation service cache is updated, the updated info should be post to federation apiserver
func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *api.Service, clusterName string) bool {
func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *v1.Service, clusterName string) bool {
glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName)
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
@ -214,7 +214,7 @@ func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService,
return needUpdate
}
func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federationclientset.Interface) error {
func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federation_release_1_3.Interface) error {
service := cachedService.lastState
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
var err error

View File

@ -20,15 +20,15 @@ import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)
func buildServiceStatus(ingresses [][]string) api.LoadBalancerStatus {
status := api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{},
func buildServiceStatus(ingresses [][]string) v1.LoadBalancerStatus {
status := v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{},
}
for _, element := range ingresses {
ingress := api.LoadBalancerIngress{IP: element[0], Hostname: element[1]}
ingress := v1.LoadBalancerIngress{IP: element[0], Hostname: element[1]}
status.Ingress = append(status.Ingress, ingress)
}
return status
@ -41,18 +41,18 @@ func TestProcessServiceUpdate(t *testing.T) {
tests := []struct {
name string
cachedService *cachedService
service *api.Service
service *v1.Service
clusterName string
expectNeedUpdate bool
expectStatus api.LoadBalancerStatus
expectStatus v1.LoadBalancerStatus
}{
{
"no-cache",
&cachedService{
lastState: &api.Service{},
serviceStatusMap: make(map[string]api.LoadBalancerStatus),
lastState: &v1.Service{},
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
},
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo",
true,
buildServiceStatus([][]string{{"ip1", ""}}),
@ -60,12 +60,12 @@ func TestProcessServiceUpdate(t *testing.T) {
{
"same-ingress",
&cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{
"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}},
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}},
},
},
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1",
false,
buildServiceStatus([][]string{{"ip1", ""}}),
@ -73,14 +73,14 @@ func TestProcessServiceUpdate(t *testing.T) {
{
"diff-cluster",
&cachedService{
lastState: &api.Service{
ObjectMeta: api.ObjectMeta{Name: "bar1"},
lastState: &v1.Service{
ObjectMeta: v1.ObjectMeta{Name: "bar1"},
},
serviceStatusMap: map[string]api.LoadBalancerStatus{
"foo2": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo2": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}},
},
},
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{{"ip1", ""}}),
@ -88,12 +88,12 @@ func TestProcessServiceUpdate(t *testing.T) {
{
"diff-ingress",
&cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}),
},
},
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}),
@ -117,20 +117,20 @@ func TestProcessServiceDeletion(t *testing.T) {
tests := []struct {
name string
cachedService *cachedService
service *api.Service
service *v1.Service
clusterName string
expectNeedUpdate bool
expectStatus api.LoadBalancerStatus
expectStatus v1.LoadBalancerStatus
}{
{
"same-ingress",
&cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{
"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}},
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": {Ingress: []v1.LoadBalancerIngress{{"ip1", ""}}},
},
},
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{}),
@ -138,13 +138,13 @@ func TestProcessServiceDeletion(t *testing.T) {
{
"diff-ingress",
&cachedService{
lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}},
serviceStatusMap: map[string]api.LoadBalancerStatus{
lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}},
serviceStatusMap: map[string]v1.LoadBalancerStatus{
"foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}),
"foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),
},
},
&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}},
&v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}},
"foo1",
true,
buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}),

View File

@ -24,14 +24,15 @@ import (
"reflect"
"github.com/golang/glog"
federation "k8s.io/kubernetes/federation/apis/federation"
v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
federationcache "k8s.io/kubernetes/federation/client/cache"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
@ -46,9 +47,8 @@ import (
)
const (
// TODO update to 10 mins before merge
serviceSyncPeriod = 30 * time.Second
clusterSyncPeriod = 100 * time.Second
serviceSyncPeriod = 10 * time.Minute
clusterSyncPeriod = 10 * time.Minute
// How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
@ -71,15 +71,15 @@ const (
)
type cachedService struct {
lastState *api.Service
lastState *v1.Service
// The state as successfully applied to the DNS server
appliedState *api.Service
appliedState *v1.Service
// cluster endpoint map hold subset info from kubernetes clusters
// key clusterName
// value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address
endpointMap map[string]int
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
serviceStatusMap map[string]api.LoadBalancerStatus
serviceStatusMap map[string]v1.LoadBalancerStatus
// Ensures only one goroutine can operate on this service at any given time.
rwlock sync.Mutex
// Controls error back-off for procceeding federation service to k8s clusters
@ -99,7 +99,7 @@ type serviceCache struct {
type ServiceController struct {
dns dnsprovider.Interface
federationClient federationclientset.Interface
federationClient federation_release_1_3.Interface
federationName string
// each federation should be configured with a single zone (e.g. "mycompany.com")
dnsZones dnsprovider.Zones
@ -123,7 +123,7 @@ type ServiceController struct {
// New returns a new service controller to keep DNS provider service resources
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
func New(federationClient federationclientset.Interface, dns dnsprovider.Interface) *ServiceController {
func New(federationClient federation_release_1_3.Interface, dns dnsprovider.Interface) *ServiceController {
broadcaster := record.NewBroadcaster()
// federationClient event is not supported yet
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
@ -145,13 +145,13 @@ func New(federationClient federationclientset.Interface, dns dnsprovider.Interfa
s.serviceStore.Store, s.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return s.federationClient.Core().Services(api.NamespaceAll).List(options)
return s.federationClient.Core().Services(v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return s.federationClient.Core().Services(api.NamespaceAll).Watch(options)
return s.federationClient.Core().Services(v1.NamespaceAll).Watch(options)
},
},
&api.Service{},
&v1.Service{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
@ -173,17 +173,17 @@ func New(federationClient federationclientset.Interface, dns dnsprovider.Interfa
return s.federationClient.Federation().Clusters().Watch(options)
},
},
&federation.Cluster{},
&v1alpha1.Cluster{},
clusterSyncPeriod,
framework.ResourceEventHandlerFuncs{
DeleteFunc: s.clusterCache.delFromClusterSet,
AddFunc: s.clusterCache.addToClientMap,
UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*federation.Cluster)
oldCluster, ok := old.(*v1alpha1.Cluster)
if !ok {
return
}
curCluster, ok := cur.(*federation.Cluster)
curCluster, ok := cur.(*v1alpha1.Cluster)
if !ok {
return
}
@ -261,14 +261,14 @@ func (s *ServiceController) fedServiceWorker() {
}
}
func wantsDNSRecords(service *api.Service) bool {
return service.Spec.Type == api.ServiceTypeLoadBalancer
func wantsDNSRecords(service *v1.Service) bool {
return service.Spec.Type == v1.ServiceTypeLoadBalancer
}
// processServiceForCluster creates or updates service to all registered running clusters,
// update DNS records and update the service info with DNS entries to federation apiserver.
// the function returns any error caught
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error {
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *release_1_3.Clientset) error {
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
// Create or Update k8s Service
err := s.ensureClusterService(cachedService, clusterName, service, client)
@ -288,7 +288,7 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c
if err != nil {
return err, !retryable
}
service, ok := clone.(*api.Service)
service, ok := clone.(*v1.Service)
if !ok {
return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable
}
@ -326,7 +326,7 @@ func (s *ServiceController) deleteFederationService(cachedService *cachedService
return nil, !retryable
}
func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *clientset.Clientset) error {
func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *release_1_3.Clientset) error {
service := cachedService.lastState
glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
var err error
@ -342,7 +342,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi
return err
}
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error {
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *release_1_3.Clientset) error {
var err error
var needUpdate bool
for i := 0; i < clientRetryCount; i++ {
@ -434,7 +434,7 @@ func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
if !ok {
service = &cachedService{
endpointMap: make(map[string]int),
serviceStatusMap: make(map[string]api.LoadBalancerStatus),
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
}
s.fedServiceMap[serviceName] = service
}
@ -454,12 +454,12 @@ func (s *serviceCache) delete(serviceName string) {
}
// needsUpdateDNS check if the dns records of the given service should be updated
func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *api.Service) bool {
func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool {
if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
return false
}
if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
oldService.Spec.Type, newService.Spec.Type)
return true
}
@ -467,18 +467,18 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *
return true
}
if !LoadBalancerIPsAreEqual(oldService, newService) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
return true
}
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
return true
}
for i := range oldService.Spec.ExternalIPs {
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
newService.Spec.ExternalIPs[i])
return true
}
@ -487,7 +487,7 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *
return true
}
if oldService.UID != newService.UID {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
oldService.UID, newService.UID)
return true
}
@ -495,11 +495,11 @@ func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *
return false
}
func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
// TODO: quinton: Probably applies for DNS SVC records. Come back to this.
//var protocol api.Protocol
ports := []*api.ServicePort{}
ports := []*v1.ServicePort{}
for i := range service.Spec.Ports {
sp := &service.Spec.Ports[i]
// The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation
@ -508,7 +508,7 @@ func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
return ports, nil
}
func portsEqualForLB(x, y *api.Service) bool {
func portsEqualForLB(x, y *v1.Service) bool {
xPorts, err := getPortsForLB(x)
if err != nil {
return false
@ -520,7 +520,7 @@ func portsEqualForLB(x, y *api.Service) bool {
return portSlicesEqualForLB(xPorts, yPorts)
}
func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
if len(x) != len(y) {
return false
}
@ -533,7 +533,7 @@ func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
return true
}
func portEqualForLB(x, y *api.ServicePort) bool {
func portEqualForLB(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name {
return false
@ -552,7 +552,7 @@ func portEqualForLB(x, y *api.ServicePort) bool {
return true
}
func portEqualExcludeNodePort(x, y *api.ServicePort) bool {
func portEqualExcludeNodePort(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name {
return false
@ -568,7 +568,7 @@ func portEqualExcludeNodePort(x, y *api.ServicePort) bool {
return true
}
func clustersFromList(list *federation.ClusterList) []string {
func clustersFromList(list *v1alpha1.ClusterList) []string {
result := []string{}
for ix := range list.Items {
result = append(result, list.Items[ix].Name)
@ -579,7 +579,7 @@ func clustersFromList(list *federation.ClusterList) []string {
// getClusterConditionPredicate filter all clusters meet condition of
// condition.type=Ready and condition.status=true
func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
return func(cluster federation.Cluster) bool {
return func(cluster v1alpha1.Cluster) bool {
// If we have no info, don't accept
if len(cluster.Status.Conditions) == 0 {
return false
@ -587,7 +587,7 @@ func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
for _, cond := range cluster.Status.Conditions {
//We consider the cluster for load balancing only when its ClusterReady condition status
//is ConditionTrue
if cond.Type == federation.ClusterReady && cond.Status != api.ConditionTrue {
if cond.Type == v1alpha1.ClusterReady && cond.Status != v1.ConditionTrue {
glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
return false
}
@ -695,7 +695,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust
return nil
}
func LoadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
@ -778,7 +778,7 @@ func (s *ServiceController) syncService(key string) error {
}
if exists {
service, ok := obj.(*api.Service)
service, ok := obj.(*v1.Service)
if ok {
cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
@ -803,7 +803,7 @@ func (s *ServiceController) syncService(key string) error {
// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
// Ensure that no other goroutine will interfere with our processing of the
// service.
cachedService.rwlock.Lock()
@ -822,7 +822,7 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
message += " (will not retry): "
}
message += err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "UpdateServiceFail", message)
s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message)
return err, cachedService.nextRetryDelay()
}
// Always update the cache upon success.
@ -849,7 +849,7 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
service := cachedService.lastState
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records")
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records")
// TODO should we delete dns info here or wait for endpoint changes? prefer here
// or we do nothing for service deletion
//err := s.dns.balancer.EnsureLoadBalancerDeleted(service)
@ -861,10 +861,10 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
} else {
message += " (will not retry): "
}
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingDNSRecordFailed", message)
s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingDNSRecordFailed", message)
return err, cachedService.nextRetryDelay()
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records")
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records")
s.serviceCache.delete(key)
cachedService.resetRetryDelay()

View File

@ -20,9 +20,9 @@ import (
"sync"
"testing"
"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -39,22 +39,22 @@ func TestGetClusterConditionPredicate(t *testing.T) {
}
tests := []struct {
cluster federation.Cluster
cluster v1alpha1.Cluster
expectAccept bool
name string
serviceController *ServiceController
}{
{
cluster: federation.Cluster{},
cluster: v1alpha1.Cluster{},
expectAccept: false,
name: "empty",
serviceController: &serviceController,
},
{
cluster: federation.Cluster{
Status: federation.ClusterStatus{
Conditions: []federation.ClusterCondition{
{Type: federation.ClusterReady, Status: api.ConditionTrue},
cluster: v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Conditions: []v1alpha1.ClusterCondition{
{Type: v1alpha1.ClusterReady, Status: v1.ConditionTrue},
},
},
},
@ -63,10 +63,10 @@ func TestGetClusterConditionPredicate(t *testing.T) {
serviceController: &serviceController,
},
{
cluster: federation.Cluster{
Status: federation.ClusterStatus{
Conditions: []federation.ClusterCondition{
{Type: federation.ClusterReady, Status: api.ConditionFalse},
cluster: v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Conditions: []v1alpha1.ClusterCondition{
{Type: v1alpha1.ClusterReady, Status: v1.ConditionFalse},
},
},
},

View File

@ -0,0 +1,116 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
utilnet "k8s.io/kubernetes/pkg/util/net"
"net"
"os"
)
const (
KubeAPIQPS = 20.0
KubeAPIBurst = 30
KubeconfigSecretDataKey = "kubeconfig"
)
func BuildClusterConfig(c *federation_v1alpha1.Cluster) (*restclient.Config, error) {
var serverAddress string
var clusterConfig *restclient.Config
hostIP, err := utilnet.ChooseHostInterface()
if err != nil {
return nil, err
}
for _, item := range c.Spec.ServerAddressByClientCIDRs {
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
if err != nil {
return nil, err
}
myaddr := net.ParseIP(hostIP.String())
if cidrnet.Contains(myaddr) == true {
serverAddress = item.ServerAddress
break
}
}
if serverAddress != "" {
if c.Spec.SecretRef == nil {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
clusterConfig, err = clientcmd.BuildConfigFromFlags(serverAddress, "")
} else {
kubeconfigGetter := KubeconfigGetterForCluster(c)
clusterConfig, err = clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter)
}
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
}
return clusterConfig, nil
}
// This is to inject a different kubeconfigGetter in tests.
// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens.
var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
secretRefName := ""
if c.Spec.SecretRef != nil {
secretRefName = c.Spec.SecretRef.Name
} else {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
}
return KubeconfigGetterForSecret(secretRefName)()
}
}
// KubeconfigGettterForSecret is used to get the kubeconfig from the given secret.
var KubeconfigGetterForSecret = func(secretName string) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
var data []byte
if secretName != "" {
// Get the namespace this is running in from the env variable.
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string")
}
// Get a client to talk to the k8s apiserver, to fetch secrets from it.
client, err := client.NewInCluster()
if err != nil {
return nil, fmt.Errorf("error in creating in-cluster client: %s", err)
}
data = []byte{}
secret, err := client.Secrets(namespace).Get(secretName)
if err != nil {
return nil, fmt.Errorf("error in fetching secret: %s", err)
}
ok := false
data, ok = secret.Data[KubeconfigSecretDataKey]
if !ok {
return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey)
}
}
return clientcmd.Load(data)
}
}