From 6133db345f010e121e9b369be3d45cceee5a9679 Mon Sep 17 00:00:00 2001 From: mfanjie Date: Sun, 22 May 2016 10:05:53 +0800 Subject: [PATCH] add federation service controller --- docs/admin/federation-controller-manager.md | 3 +- federation/client/cache/cluster_cache.go | 35 +- .../app/controllermanager.go | 14 + .../app/options/options.go | 22 +- .../service/cluster_helper.go | 207 +++++ .../pkg/federation-controller/service/dns.go | 40 + .../pkg/federation-controller/service/doc.go | 19 + .../service/endpoint_helper.go | 162 ++++ .../service/endpoint_helper_test.go | 120 +++ .../service/service_helper.go | 253 +++++ .../service/service_helper_test.go | 162 ++++ .../service/servicecontroller.go | 874 ++++++++++++++++++ .../service/servicecontroller_test.go | 67 ++ hack/verify-flags/known-flags.txt | 1 + 14 files changed, 1969 insertions(+), 10 deletions(-) create mode 100644 federation/pkg/federation-controller/service/cluster_helper.go create mode 100644 federation/pkg/federation-controller/service/dns.go create mode 100644 federation/pkg/federation-controller/service/doc.go create mode 100644 federation/pkg/federation-controller/service/endpoint_helper.go create mode 100644 federation/pkg/federation-controller/service/endpoint_helper_test.go create mode 100644 federation/pkg/federation-controller/service/service_helper.go create mode 100644 federation/pkg/federation-controller/service/service_helper_test.go create mode 100644 federation/pkg/federation-controller/service/servicecontroller.go create mode 100644 federation/pkg/federation-controller/service/servicecontroller_test.go diff --git a/docs/admin/federation-controller-manager.md b/docs/admin/federation-controller-manager.md index 4852da51fdb..21e57342509 100644 --- a/docs/admin/federation-controller-manager.md +++ b/docs/admin/federation-controller-manager.md @@ -51,6 +51,7 @@ federation-controller-manager ``` --address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces) --cluster-monitor-period=40s: The period for syncing ClusterStatus in ClusterController. + --concurrent-service-syncs=10: The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load --federated-api-burst=30: Burst to use while talking with federation apiserver --federated-api-qps=20: QPS to use while talking with federation apiserver --kube-api-content-type="": ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now. @@ -65,7 +66,7 @@ federation-controller-manager --profiling[=true]: Enable profiling via web interface host:port/debug/pprof/ ``` -###### Auto generated by spf13/cobra on 25-May-2016 +###### Auto generated by spf13/cobra on 29-May-2016 diff --git a/federation/client/cache/cluster_cache.go b/federation/client/cache/cluster_cache.go index 10ef095589e..ba22cc56247 100644 --- a/federation/client/cache/cluster_cache.go +++ b/federation/client/cache/cluster_cache.go @@ -17,7 +17,8 @@ limitations under the License. package cache import ( - federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + "github.com/golang/glog" + "k8s.io/kubernetes/federation/apis/federation" kubeCache "k8s.io/kubernetes/pkg/client/cache" ) @@ -27,9 +28,37 @@ type StoreToClusterLister struct { kubeCache.Store } -func (s *StoreToClusterLister) List() (clusters federation_v1alpha1.ClusterList, err error) { +func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) { for _, m := range s.Store.List() { - clusters.Items = append(clusters.Items, *(m.(*federation_v1alpha1.Cluster))) + clusters.Items = append(clusters.Items, *(m.(*federation.Cluster))) } return clusters, nil } + +// ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet +// some set of criteria defined by the function. +type ClusterConditionPredicate func(cluster federation.Cluster) bool + +// storeToClusterConditionLister filters and returns nodes matching the given type and status from the store. +type storeToClusterConditionLister struct { + store kubeCache.Store + predicate ClusterConditionPredicate +} + +// ClusterCondition returns a storeToClusterConditionLister +func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredicate) storeToClusterConditionLister { + return storeToClusterConditionLister{s.Store, predicate} +} + +// List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister. +func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) { + for _, m := range s.store.List() { + cluster := *m.(*federation.Cluster) + if s.predicate(cluster) { + clusters.Items = append(clusters.Items, cluster) + } else { + glog.V(5).Infof("Cluster %s matches none of the conditions", cluster.Name) + } + } + return +} diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 582c938ce80..abc1db454da 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -28,11 +28,15 @@ import ( "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/pkg/client/restclient" + internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" + servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/util/configz" + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" @@ -103,7 +107,17 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { + federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run() + dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile) + if err != nil { + glog.Fatalf("Cloud provider could not be initialized: %v", err) + } + scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) + servicecontroller := servicecontroller.New(scclientset, dns) + if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { + glog.Errorf("Failed to start service controller: %v", err) + } select {} } diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go index 9a6e8735f9f..f25b16daaa7 100644 --- a/federation/cmd/federation-controller-manager/app/options/options.go +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -32,6 +32,14 @@ type ControllerManagerConfiguration struct { Port int `json:"port"` // address is the IP address to serve on (set to 0.0.0.0 for all interfaces). Address string `json:"address"` + // dnsProvider is the provider for dns services. + DnsProvider string `json:"dnsProvider"` + // dnsConfigFile is the path to the dns provider configuration file. + DnsConfigFile string `json:"ndsConfigFile"` + // concurrentServiceSyncs is the number of services that are + // allowed to sync concurrently. Larger number = more responsive service + // management, but more CPU (and network) load. + ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"` // clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` // APIServerQPS is the QPS to use while talking with federation apiserver. @@ -63,12 +71,13 @@ const ( func NewCMServer() *CMServer { s := CMServer{ ControllerManagerConfiguration: ControllerManagerConfiguration{ - Port: FederatedControllerManagerPort, - Address: "0.0.0.0", - ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, - APIServerQPS: 20.0, - APIServerBurst: 30, - LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), + Port: FederatedControllerManagerPort, + Address: "0.0.0.0", + ConcurrentServiceSyncs: 10, + ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, + APIServerQPS: 20.0, + APIServerBurst: 30, + LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), }, } return &s @@ -78,6 +87,7 @@ func NewCMServer() *CMServer { func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") diff --git a/federation/pkg/federation-controller/service/cluster_helper.go b/federation/pkg/federation-controller/service/cluster_helper.go new file mode 100644 index 00000000000..ac8093d651e --- /dev/null +++ b/federation/pkg/federation-controller/service/cluster_helper.go @@ -0,0 +1,207 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "sync" + + "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/pkg/api" + cache "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" + "reflect" +) + +type clusterCache struct { + clientset *clientset.Clientset + cluster *federation.Cluster + // A store of services, populated by the serviceController + serviceStore cache.StoreToServiceLister + // Watches changes to all services + serviceController *framework.Controller + // A store of endpoint, populated by the serviceController + endpointStore cache.StoreToEndpointsLister + // Watches changes to all endpoints + endpointController *framework.Controller + // services that need to be synced + serviceQueue *workqueue.Type + // endpoints that need to be synced + endpointQueue *workqueue.Type +} + +type clusterClientCache struct { + rwlock sync.Mutex // protects serviceMap + clientMap map[string]*clusterCache +} + +func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) { + cachedClusterClient, ok := cc.clientMap[clusterName] + // only create when no existing cachedClusterClient + if ok { + if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) { + //rebuild clientset when cluster spec is changed + clientset, err := newClusterClientset(cluster) + if err != nil || clientset == nil { + glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) + } + glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName) + cachedClusterClient.clientset = clientset + go cachedClusterClient.serviceController.Run(wait.NeverStop) + go cachedClusterClient.endpointController.Run(wait.NeverStop) + glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName) + } else { + // do nothing when there is no spec change + glog.V(4).Infof("Keep clientset for cluster %s", clusterName) + return + } + } else { + glog.V(4).Infof("No client cache for cluster %s, building new", clusterName) + clientset, err := newClusterClientset(cluster) + if err != nil || clientset == nil { + glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) + } + cachedClusterClient = &clusterCache{ + cluster: cluster, + clientset: clientset, + serviceQueue: workqueue.New(), + endpointQueue: workqueue.New(), + } + cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return clientset.Core().Endpoints(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Core().Endpoints(api.NamespaceAll).Watch(options) + }, + }, + &api.Endpoints{}, + serviceSyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cc.enqueueEndpoint(obj, clusterName) + }, + UpdateFunc: func(old, cur interface{}) { + cc.enqueueEndpoint(cur, clusterName) + }, + DeleteFunc: func(obj interface{}) { + cc.enqueueEndpoint(obj, clusterName) + }, + }, + ) + + cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return clientset.Core().Services(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Core().Services(api.NamespaceAll).Watch(options) + }, + }, + &api.Service{}, + serviceSyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cc.enqueueService(obj, clusterName) + }, + UpdateFunc: func(old, cur interface{}) { + oldService, ok := old.(*api.Service) + + if !ok { + return + } + curService, ok := cur.(*api.Service) + if !ok { + return + } + if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) { + cc.enqueueService(cur, clusterName) + } + }, + DeleteFunc: func(obj interface{}) { + service, _ := obj.(*api.Service) + cc.enqueueService(obj, clusterName) + glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName) + }, + }, + ) + cc.clientMap[clusterName] = cachedClusterClient + go cachedClusterClient.serviceController.Run(wait.NeverStop) + go cachedClusterClient.endpointController.Run(wait.NeverStop) + glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName) + } + +} + +//TODO: copied from cluster controller, to make this as common function in pass 2 +// delFromClusterSet delete a cluster from clusterSet and +// delete the corresponding restclient from the map clusterKubeClientMap +func (cc *clusterClientCache) delFromClusterSet(obj interface{}) { + cluster, ok := obj.(*federation.Cluster) + cc.rwlock.Lock() + defer cc.rwlock.Unlock() + if ok { + delete(cc.clientMap, cluster.Name) + } else { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj) + return + } + glog.Infof("Found tombstone for %v", obj) + delete(cc.clientMap, tombstone.Key) + } +} + +// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding +// restclient to map clusterKubeClientMap +func (cc *clusterClientCache) addToClientMap(obj interface{}) { + cluster := obj.(*federation.Cluster) + cc.rwlock.Lock() + defer cc.rwlock.Unlock() + cluster, ok := obj.(*federation.Cluster) + if !ok { + return + } + pred := getClusterConditionPredicate() + // check status + // skip if not ready + if pred(*cluster) { + cc.startClusterLW(cluster, cluster.Name) + } +} + +func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) { + clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "") + if err != nil { + return nil, err + } + clusterConfig.QPS = KubeAPIQPS + clusterConfig.Burst = KubeAPIBurst + clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName)) + return clientset, nil +} diff --git a/federation/pkg/federation-controller/service/dns.go b/federation/pkg/federation-controller/service/dns.go new file mode 100644 index 00000000000..d393fec9565 --- /dev/null +++ b/federation/pkg/federation-controller/service/dns.go @@ -0,0 +1,40 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +// getClusterZoneName returns the name of the zone where the specified cluster exists (e.g. "us-east1-c" on GCE, or "us-east-1b" on AWS) +func getClusterZoneName(clusterName string) string { + // TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet. + return "zone-of-cluster-" + clusterName +} + +// getClusterRegionName returns the name of the region where the specified cluster exists (e.g. us-east1 on GCE, or "us-east-1" on AWS) +func getClusterRegionName(clusterName string) string { + // TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet. + return "region-of-cluster-" + clusterName +} + +// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation +func getFederationDNSZoneName() string { + return "mydomain.com" // TODO: quinton: Get this from the federation configuration. +} + +func ensureDNSRecords(clusterName string, cachedService *cachedService) error { + // Quinton: Pseudocode.... + + return nil +} diff --git a/federation/pkg/federation-controller/service/doc.go b/federation/pkg/federation-controller/service/doc.go new file mode 100644 index 00000000000..aebc7837f00 --- /dev/null +++ b/federation/pkg/federation-controller/service/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package service contains code for syncing Kubernetes services, +// and cloud DNS servers with the federated service registry. +package service diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go new file mode 100644 index 00000000000..0358860e516 --- /dev/null +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -0,0 +1,162 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "fmt" + "time" + + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" + "k8s.io/kubernetes/pkg/api" + cache "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + + "github.com/golang/glog" +) + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (sc *ServiceController) clusterEndpointWorker() { + fedClient := sc.federationClient + for clusterName, cache := range sc.clusterCache.clientMap { + go func(cache *clusterCache, clusterName string) { + for { + func() { + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } + }() + } + }(cache, clusterName) + } +} + +// Whenever there is change on endpoint, the federation service should be updated +// key is the namespaced name of endpoint +func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error { + cachedService, ok := serviceCache.get(key) + if !ok { + // here we filtered all non-federation services + return nil + } + endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key) + if err != nil { + glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err) + clusterCache.endpointQueue.Add(key) + return err + } + if exists { + endpoint, ok := endpointInterface.(*api.Endpoints) + if ok { + glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) + err = cc.processEndpointUpdate(cachedService, endpoint, clusterName) + } else { + _, ok := endpointInterface.(cache.DeletedFinalStateUnknown) + if !ok { + return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface) + } + glog.Infof("Found tombstone for %v", key) + err = cc.processEndpointDeletion(cachedService, clusterName) + } + } else { + // service absence in store means watcher caught the deletion, ensure LB info is cleaned + glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName) + err = cc.processEndpointDeletion(cachedService, clusterName) + } + if err != nil { + glog.Errorf("Failed to sync service: %+v, put back to service queue", err) + clusterCache.endpointQueue.Add(key) + } + cachedService.resetDNSUpdateDelay() + return nil +} + +func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string) error { + glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) + var err error + cachedService.rwlock.Lock() + defer cachedService.rwlock.Unlock() + _, ok := cachedService.endpointMap[clusterName] + // TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist + // need to query dns info from dnsprovider and make sure of if deletion is needed + if ok { + // endpoints lost, clean dns record + glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) + // TODO: need to integrate with dns.go:ensureDNSRecords + for i := 0; i < clientRetryCount; i++ { + err := ensureDNSRecords(clusterName, cachedService) + if err == nil { + delete(cachedService.endpointMap, clusterName) + return nil + } + time.Sleep(cachedService.nextDNSUpdateDelay()) + } + } + return err +} + +// Update dns info when endpoint update event received +// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0 +func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string) error { + glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName) + cachedService.rwlock.Lock() + defer cachedService.rwlock.Unlock() + for _, subset := range endpoint.Subsets { + if len(subset.Addresses) > 0 { + cachedService.endpointMap[clusterName] = 1 + } + } + _, ok := cachedService.endpointMap[clusterName] + if !ok { + // first time get endpoints, update dns record + glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName) + cachedService.endpointMap[clusterName] = 1 + err := ensureDNSRecords(clusterName, cachedService) + if err != nil { + // TODO: need to integrate with dns.go:ensureDNSRecords + for i := 0; i < clientRetryCount; i++ { + time.Sleep(cachedService.nextDNSUpdateDelay()) + err := ensureDNSRecords(clusterName, cachedService) + if err == nil { + return nil + } + } + return err + } + } + return nil +} + +// obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item. +func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + _, ok := cc.clientMap[clusterName] + if ok { + cc.clientMap[clusterName].endpointQueue.Add(key) + } +} diff --git a/federation/pkg/federation-controller/service/endpoint_helper_test.go b/federation/pkg/federation-controller/service/endpoint_helper_test.go new file mode 100644 index 00000000000..944030ce64f --- /dev/null +++ b/federation/pkg/federation-controller/service/endpoint_helper_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" +) + +func buildEndpoint(subsets [][]string) *api.Endpoints { + endpoint := &api.Endpoints{ + Subsets: []api.EndpointSubset{ + {Addresses: []api.EndpointAddress{}}, + }, + } + for _, element := range subsets { + address := api.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil} + endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address) + } + return endpoint +} + +func TestProcessEndpointUpdate(t *testing.T) { + cc := clusterClientCache{ + clientMap: make(map[string]*clusterCache), + } + tests := []struct { + name string + cachedService *cachedService + endpoint *api.Endpoints + clusterName string + expectResult int + }{ + { + "no-cache", + &cachedService{ + lastState: &api.Service{}, + endpointMap: make(map[string]int), + }, + buildEndpoint([][]string{{"ip1", ""}}), + "foo", + 1, + }, + { + "has-cache", + &cachedService{ + lastState: &api.Service{}, + endpointMap: map[string]int{ + "foo": 1, + }, + }, + buildEndpoint([][]string{{"ip1", ""}}), + "foo", + 1, + }, + } + for _, test := range tests { + cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName) + if test.expectResult != test.cachedService.endpointMap[test.clusterName] { + t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName]) + } + } +} + +func TestProcessEndpointDeletion(t *testing.T) { + cc := clusterClientCache{ + clientMap: make(map[string]*clusterCache), + } + tests := []struct { + name string + cachedService *cachedService + endpoint *api.Endpoints + clusterName string + expectResult int + }{ + { + "no-cache", + &cachedService{ + lastState: &api.Service{}, + endpointMap: make(map[string]int), + }, + buildEndpoint([][]string{{"ip1", ""}}), + "foo", + 0, + }, + { + "has-cache", + &cachedService{ + lastState: &api.Service{}, + endpointMap: map[string]int{ + "foo": 1, + }, + }, + buildEndpoint([][]string{{"ip1", ""}}), + "foo", + 0, + }, + } + for _, test := range tests { + cc.processEndpointDeletion(test.cachedService, test.clusterName) + if test.expectResult != test.cachedService.endpointMap[test.clusterName] { + t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName]) + } + } +} diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go new file mode 100644 index 00000000000..5a2c4fca363 --- /dev/null +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -0,0 +1,253 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "fmt" + "time" + + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + cache "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + + "github.com/golang/glog" + "reflect" + "sort" +) + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (sc *ServiceController) clusterServiceWorker() { + fedClient := sc.federationClient + for clusterName, cache := range sc.clusterCache.clientMap { + go func(cache *clusterCache, clusterName string) { + for { + func() { + key, quit := cache.serviceQueue.Get() + defer cache.serviceQueue.Done(key) + if quit { + return + } + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + }() + } + }(cache, clusterName) + } +} + +// Whenever there is change on service, the federation service should be updated +func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error { + // obj holds the latest service info from apiserver, return if there is no federation cache for the service + cachedService, ok := serviceCache.get(key) + if !ok { + // if serviceCache does not exists, that means the service is not created by federation, we should skip it + return nil + } + serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key) + if err != nil { + glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err) + clusterCache.serviceQueue.Add(key) + return err + } + var needUpdate bool + if exists { + service, ok := serviceInterface.(*api.Service) + if ok { + glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) + needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) + } else { + _, ok := serviceInterface.(cache.DeletedFinalStateUnknown) + if !ok { + return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", serviceInterface) + } + glog.Infof("Found tombstone for %v", key) + needUpdate = cc.processServiceDeletion(cachedService, clusterName) + } + } else { + glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName) + needUpdate = cc.processServiceDeletion(cachedService, clusterName) + } + + if needUpdate { + err := cc.persistFedServiceUpdate(cachedService, fedClient) + if err == nil { + cachedService.appliedState = cachedService.lastState + cachedService.resetFedUpdateDelay() + } else { + if err != nil { + glog.Errorf("Failed to sync service: %+v, put back to service queue", err) + clusterCache.serviceQueue.Add(key) + } + } + } + return nil +} + +// processServiceDeletion is triggered when a service is delete from underlying k8s cluster +// the deletion function will wip out the cached ingress info of the service from federation service ingress +// the function returns a bool to indicate if actual update happend on federation service cache +// and if the federation service cache is updated, the updated info should be post to federation apiserver +func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedService, clusterName string) bool { + cachedService.rwlock.Lock() + defer cachedService.rwlock.Unlock() + cachedStatus, ok := cachedService.serviceStatusMap[clusterName] + // cached status found, remove ingress info from federation service cache + if ok { + cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer + removeIndexes := []int{} + for i, fed := range cachedFedServiceStatus.Ingress { + for _, new := range cachedStatus.Ingress { + // remove if same ingress record found + if new.IP == fed.IP && new.Hostname == fed.Hostname { + removeIndexes = append(removeIndexes, i) + } + } + } + sort.Ints(removeIndexes) + for i := len(removeIndexes) - 1; i >= 0; i-- { + cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...) + glog.V(4).Infof("Remove old ingress %d for service %s/%s", removeIndexes[i], cachedService.lastState.Namespace, cachedService.lastState.Name) + } + delete(cachedService.serviceStatusMap, clusterName) + delete(cachedService.endpointMap, clusterName) + cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus + return true + } else { + glog.V(4).Infof("Service removal %s/%s from cluster %s observed.", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) + } + return false +} + +// processServiceUpdate Update ingress info when service updated +// the function returns a bool to indicate if actual update happend on federation service cache +// and if the federation service cache is updated, the updated info should be post to federation apiserver +func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *api.Service, clusterName string) bool { + glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) + cachedService.rwlock.Lock() + defer cachedService.rwlock.Unlock() + var needUpdate bool + newServiceLB := service.Status.LoadBalancer + cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer + if len(newServiceLB.Ingress) == 0 { + // not yet get LB IP + return false + } + + cachedStatus, ok := cachedService.serviceStatusMap[clusterName] + if ok { + if reflect.DeepEqual(cachedStatus, newServiceLB) { + glog.V(4).Infof("Same ingress info observed for service %s/%s: %+v ", service.Namespace, service.Name, cachedStatus.Ingress) + } else { + glog.V(4).Infof("Ingress info was changed for service %s/%s: cache: %+v, new: %+v ", + service.Namespace, service.Name, cachedStatus.Ingress, newServiceLB) + needUpdate = true + } + } else { + glog.V(4).Infof("Cached service status was not found for %s/%s, cluster %s, building one", service.Namespace, service.Name, clusterName) + + // cache is not always reliable(cache will be cleaned when service controller restart) + // two cases will run into this branch: + // 1. new service loadbalancer info received -> no info in cache, and no in federation service + // 2. service controller being restarted -> no info in cache, but it is in federation service + + // check if the lb info is already in federation service + + cachedService.serviceStatusMap[clusterName] = newServiceLB + needUpdate = false + // iterate service ingress info + for _, new := range newServiceLB.Ingress { + var found bool + // if it is known by federation service + for _, fed := range cachedFedServiceStatus.Ingress { + if new.IP == fed.IP && new.Hostname == fed.Hostname { + found = true + } + } + if !found { + needUpdate = true + break + } + } + } + + if needUpdate { + // new status = cached federation status - cached status + new status from k8s cluster + + removeIndexes := []int{} + for i, fed := range cachedFedServiceStatus.Ingress { + for _, new := range cachedStatus.Ingress { + // remove if same ingress record found + if new.IP == fed.IP && new.Hostname == fed.Hostname { + removeIndexes = append(removeIndexes, i) + } + } + } + sort.Ints(removeIndexes) + for i := len(removeIndexes) - 1; i >= 0; i-- { + cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...) + } + cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress, service.Status.LoadBalancer.Ingress...) + cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus + glog.V(4).Infof("Add new ingress info %+v for service %s/%s", service.Status.LoadBalancer, service.Namespace, service.Name) + } else { + glog.V(4).Infof("Same ingress info found for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) + } + return needUpdate +} + +func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federationclientset.Interface) error { + service := cachedService.lastState + glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name) + var err error + for i := 0; i < clientRetryCount; i++ { + _, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service) + if err == nil { + glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name) + return nil + } + if errors.IsNotFound(err) { + glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", + service.Namespace, service.Name, err) + return nil + } + if errors.IsConflict(err) { + glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v", + service.Namespace, service.Name, err) + return err + } + time.Sleep(cachedService.nextFedUpdateDelay()) + } + return err +} + +// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. +func (cc *clusterClientCache) enqueueService(obj interface{}, clusterName string) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + _, ok := cc.clientMap[clusterName] + if ok { + cc.clientMap[clusterName].serviceQueue.Add(key) + } +} diff --git a/federation/pkg/federation-controller/service/service_helper_test.go b/federation/pkg/federation-controller/service/service_helper_test.go new file mode 100644 index 00000000000..09767d3f475 --- /dev/null +++ b/federation/pkg/federation-controller/service/service_helper_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api" +) + +func buildServiceStatus(ingresses [][]string) api.LoadBalancerStatus { + status := api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{}, + } + for _, element := range ingresses { + ingress := api.LoadBalancerIngress{IP: element[0], Hostname: element[1]} + status.Ingress = append(status.Ingress, ingress) + } + return status +} + +func TestProcessServiceUpdate(t *testing.T) { + cc := clusterClientCache{ + clientMap: make(map[string]*clusterCache), + } + tests := []struct { + name string + cachedService *cachedService + service *api.Service + clusterName string + expectNeedUpdate bool + expectStatus api.LoadBalancerStatus + }{ + { + "no-cache", + &cachedService{ + lastState: &api.Service{}, + serviceStatusMap: make(map[string]api.LoadBalancerStatus), + }, + &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + "foo", + true, + buildServiceStatus([][]string{{"ip1", ""}}), + }, + { + "same-ingress", + &cachedService{ + lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + serviceStatusMap: map[string]api.LoadBalancerStatus{ + "foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, + }, + }, + &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + "foo1", + false, + buildServiceStatus([][]string{{"ip1", ""}}), + }, + { + "diff-cluster", + &cachedService{ + lastState: &api.Service{ + ObjectMeta: api.ObjectMeta{Name: "bar1"}, + }, + serviceStatusMap: map[string]api.LoadBalancerStatus{ + "foo2": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, + }, + }, + &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + "foo1", + true, + buildServiceStatus([][]string{{"ip1", ""}}), + }, + { + "diff-ingress", + &cachedService{ + lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}}, + serviceStatusMap: map[string]api.LoadBalancerStatus{ + "foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}), + }, + }, + &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}}, + "foo1", + true, + buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}), + }, + } + for _, test := range tests { + result := cc.processServiceUpdate(test.cachedService, test.service, test.clusterName) + if test.expectNeedUpdate != result { + t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result) + } + if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) { + t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) + } + } +} + +func TestProcessServiceDeletion(t *testing.T) { + cc := clusterClientCache{ + clientMap: make(map[string]*clusterCache), + } + tests := []struct { + name string + cachedService *cachedService + service *api.Service + clusterName string + expectNeedUpdate bool + expectStatus api.LoadBalancerStatus + }{ + { + "same-ingress", + &cachedService{ + lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + serviceStatusMap: map[string]api.LoadBalancerStatus{ + "foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, + }, + }, + &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, + "foo1", + true, + buildServiceStatus([][]string{}), + }, + { + "diff-ingress", + &cachedService{ + lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}}, + serviceStatusMap: map[string]api.LoadBalancerStatus{ + "foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}), + "foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), + }, + }, + &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}}, + "foo1", + true, + buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), + }, + } + for _, test := range tests { + result := cc.processServiceDeletion(test.cachedService, test.clusterName) + if test.expectNeedUpdate != result { + t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result) + } + if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) { + t.Errorf("Test failed for %s, expected %+v, saw %+v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) + } + } +} diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go new file mode 100644 index 00000000000..3bd2562ff75 --- /dev/null +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -0,0 +1,874 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "fmt" + "sync" + "time" + + "reflect" + + "github.com/golang/glog" + federation "k8s.io/kubernetes/federation/apis/federation" + federationcache "k8s.io/kubernetes/federation/client/cache" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" + "k8s.io/kubernetes/federation/pkg/dnsprovider" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + cache "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" + + "k8s.io/kubernetes/pkg/conversion" +) + +const ( + // TODO update to 10 mins before merge + serviceSyncPeriod = 30 * time.Second + clusterSyncPeriod = 100 * time.Second + + // How long to wait before retrying the processing of a service change. + // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster + // should be changed appropriately. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + + // client retry count and interval is when accessing a remote kube-apiserver or federation apiserver + // how many times should be attempted and how long it should sleep when failure occurs + // the retry should be in short time so no exponential backoff + clientRetryCount = 5 + + retryable = true + + doNotRetry = time.Duration(0) + + UserAgentName = "federation-service-controller" + KubeAPIQPS = 20.0 + KubeAPIBurst = 30 +) + +type cachedService struct { + lastState *api.Service + // The state as successfully applied to the DNS server + appliedState *api.Service + // cluster endpoint map hold subset info from kubernetes clusters + // key clusterName + // value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address + endpointMap map[string]int + // cluster service map hold serivice status info from kubernetes clusters + // key clusterName + + serviceStatusMap map[string]api.LoadBalancerStatus + + // Ensures only one goroutine can operate on this service at any given time. + rwlock sync.Mutex + + // Controls error back-off for procceeding federation service to k8s clusters + lastRetryDelay time.Duration + // Controls error back-off for updating federation service back to federation apiserver + lastFedUpdateDelay time.Duration + // Controls error back-off for dns record update + lastDNSUpdateDelay time.Duration +} + +type serviceCache struct { + rwlock sync.Mutex // protects serviceMap + // federation service map contains all service received from federation apiserver + // key serviceName + fedServiceMap map[string]*cachedService +} + +type ServiceController struct { + dns dnsprovider.Interface + federationClient federationclientset.Interface + zones []dnsprovider.Zone + serviceCache *serviceCache + clusterCache *clusterClientCache + // A store of services, populated by the serviceController + serviceStore cache.StoreToServiceLister + // Watches changes to all services + serviceController *framework.Controller + // A store of services, populated by the serviceController + clusterStore federationcache.StoreToClusterLister + // Watches changes to all services + clusterController *framework.Controller + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + // services that need to be synced + queue *workqueue.Type + knownClusterSet sets.String +} + +// New returns a new service controller to keep DNS provider service resources +// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. + +func New(federationClient federationclientset.Interface, dns dnsprovider.Interface) *ServiceController { + broadcaster := record.NewBroadcaster() + // federationClient event is not supported yet + // broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + recorder := broadcaster.NewRecorder(api.EventSource{Component: UserAgentName}) + + s := &ServiceController{ + dns: dns, + federationClient: federationClient, + serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, + clusterCache: &clusterClientCache{ + rwlock: sync.Mutex{}, + clientMap: make(map[string]*clusterCache), + }, + eventBroadcaster: broadcaster, + eventRecorder: recorder, + queue: workqueue.New(), + knownClusterSet: make(sets.String), + } + s.serviceStore.Store, s.serviceController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return s.federationClient.Core().Services(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return s.federationClient.Core().Services(api.NamespaceAll).Watch(options) + }, + }, + &api.Service{}, + serviceSyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: s.enqueueService, + UpdateFunc: func(old, cur interface{}) { + // there is case that old and new are equals but we still catch the event now. + if !reflect.DeepEqual(old, cur) { + s.enqueueService(cur) + } + }, + DeleteFunc: s.enqueueService, + }, + ) + s.clusterStore.Store, s.clusterController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return s.federationClient.Federation().Clusters().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return s.federationClient.Federation().Clusters().Watch(options) + }, + }, + &federation.Cluster{}, + clusterSyncPeriod, + framework.ResourceEventHandlerFuncs{ + DeleteFunc: s.clusterCache.delFromClusterSet, + AddFunc: s.clusterCache.addToClientMap, + UpdateFunc: func(old, cur interface{}) { + oldCluster, ok := old.(*federation.Cluster) + if !ok { + return + } + curCluster, ok := cur.(*federation.Cluster) + if !ok { + return + } + if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { + // update when spec is changed + s.clusterCache.addToClientMap(cur) + } + + pred := getClusterConditionPredicate() + // only update when condition changed to ready from not-ready + if !pred(*oldCluster) && pred(*curCluster) { + s.clusterCache.addToClientMap(cur) + } + // did not handle ready -> not-ready + // how could we stop a controller? + }, + }, + ) + return s +} + +// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. +func (s *ServiceController) enqueueService(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + s.queue.Add(key) +} + +// Run starts a background goroutine that watches for changes to federation services +// and ensures that they have Kubernetes services created, updated or deleted appropriately. +// federationSyncPeriod controls how often we check the federation's services to +// ensure that the correct Kubernetes services (and associated DNS entries) exist. +// This is only necessary to fudge over failed watches. +// clusterSyncPeriod controls how often we check the federation's underlying clusters and +// their Kubernetes services to ensure that matching services created independently of the Federation +// (e.g. directly via the underlying cluster's API) are correctly accounted for. + +// It's an error to call Run() more than once for a given ServiceController +// object. +func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + go s.serviceController.Run(stopCh) + go s.clusterController.Run(stopCh) + for i := 0; i < workers; i++ { + go wait.Until(s.fedServiceWorker, time.Second, stopCh) + } + go wait.Until(s.clusterEndpointWorker, time.Second, stopCh) + go wait.Until(s.clusterServiceWorker, time.Second, stopCh) + go wait.Until(s.clusterSyncLoop, clusterSyncPeriod, stopCh) + <-stopCh + glog.Infof("Shutting down Federation Service Controller") + s.queue.ShutDown() + return nil +} + +// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncService is never invoked concurrently with the same key. +func (s *ServiceController) fedServiceWorker() { + for { + func() { + key, quit := s.queue.Get() + if quit { + return + } + + defer s.queue.Done(key) + err := s.syncService(key.(string)) + if err != nil { + glog.Errorf("Error syncing service: %v", err) + } + }() + } +} + +func wantsDNSRecords(service *api.Service) bool { + return service.Spec.Type == api.ServiceTypeLoadBalancer +} + +// processServiceForCluster creates or updates service to all registered running clusters, +// update DNS records and update the service info with DNS entries to federation apiserver. +// the function returns any error caught +func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { + glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) + // Create or Update k8s Service + err := s.ensureClusterService(cachedService, clusterName, service, client) + if err != nil { + glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) + return err + } + glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) + return nil +} + +// updateFederationService Returns whatever error occurred along with a boolean indicator of whether it +// should be retried. +func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) { + // Clone federation service, and create them in underlying k8s cluster + clone, err := conversion.NewCloner().DeepCopy(cachedService.lastState) + if err != nil { + return err, !retryable + } + service, ok := clone.(*api.Service) + if !ok { + return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable + } + + // handle available clusters one by one + var hasErr bool + for clusterName, cache := range s.clusterCache.clientMap { + go func(cache *clusterCache, clusterName string) { + err = s.processServiceForCluster(cachedService, clusterName, service, cache.clientset) + if err != nil { + hasErr = true + } + }(cache, clusterName) + } + if hasErr { + // detail error has been dumpped inside the loop + return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", service.Namespace, service.Name), retryable + } + return nil, !retryable +} + +func (s *ServiceController) deleteFederationService(cachedService *cachedService) (error, bool) { + // handle available clusters one by one + var hasErr bool + for clusterName, cluster := range s.clusterCache.clientMap { + err := s.deleteClusterService(clusterName, cachedService, cluster.clientset) + if err != nil { + hasErr = true + } + } + if hasErr { + // detail error has been dumpped inside the loop + return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", cachedService.lastState.Namespace, cachedService.lastState.Name), retryable + } + return nil, !retryable +} + +func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *clientset.Clientset) error { + service := cachedService.lastState + glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) + var err error + for i := 0; i < clientRetryCount; i++ { + err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{}) + if err == nil || errors.IsNotFound(err) { + glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName) + return nil + } + time.Sleep(cachedService.nextRetryDelay()) + } + glog.V(4).Infof("Failed to delete service %s/%s from cluster %s, %+v", service.Namespace, service.Name, clusterName, err) + return err +} + +func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { + var err error + var needUpdate bool + for i := 0; i < clientRetryCount; i++ { + svc, err := client.Core().Services(service.Namespace).Get(service.Name) + if err == nil { + // service exists + glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) + //reserve immutable fields + service.Spec.ClusterIP = svc.Spec.ClusterIP + + //reserve auto assigned field + for i, oldPort := range svc.Spec.Ports { + for _, port := range service.Spec.Ports { + if port.NodePort == 0 { + if !portEqualExcludeNodePort(&oldPort, &port) { + svc.Spec.Ports[i] = port + needUpdate = true + } + } else { + if !portEqualForLB(&oldPort, &port) { + svc.Spec.Ports[i] = port + needUpdate = true + } + } + } + } + + if needUpdate { + // we only apply spec update + svc.Spec = service.Spec + _, err = client.Core().Services(svc.Namespace).Update(svc) + if err == nil { + glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName) + return nil + } else { + glog.V(4).Infof("Failed to update %+v", err) + } + } else { + glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName) + return nil + } + } else if errors.IsNotFound(err) { + // Create service if it is not found + glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new", + service.Namespace, service.Name, clusterName) + service.ResourceVersion = "" + _, err = client.Core().Services(service.Namespace).Create(service) + if err == nil { + glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName) + return nil + } + glog.V(4).Infof("Failed to create %+v", err) + if errors.IsAlreadyExists(err) { + glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName) + return nil + } + } + if errors.IsConflict(err) { + glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v", + service.Namespace, service.Name, err) + } + // should we reuse same retry delay for all clusters? + time.Sleep(cachedService.nextRetryDelay()) + } + return err +} + +func (s *serviceCache) allServices() []*cachedService { + s.rwlock.Lock() + defer s.rwlock.Unlock() + services := make([]*cachedService, 0, len(s.fedServiceMap)) + for _, v := range s.fedServiceMap { + services = append(services, v) + } + return services +} + +func (s *serviceCache) get(serviceName string) (*cachedService, bool) { + s.rwlock.Lock() + defer s.rwlock.Unlock() + service, ok := s.fedServiceMap[serviceName] + return service, ok +} + +func (s *serviceCache) getOrCreate(serviceName string) *cachedService { + s.rwlock.Lock() + defer s.rwlock.Unlock() + service, ok := s.fedServiceMap[serviceName] + if !ok { + service = &cachedService{ + endpointMap: make(map[string]int), + serviceStatusMap: make(map[string]api.LoadBalancerStatus), + } + s.fedServiceMap[serviceName] = service + } + return service +} + +func (s *serviceCache) set(serviceName string, service *cachedService) { + s.rwlock.Lock() + defer s.rwlock.Unlock() + s.fedServiceMap[serviceName] = service +} + +func (s *serviceCache) delete(serviceName string) { + s.rwlock.Lock() + defer s.rwlock.Unlock() + delete(s.fedServiceMap, serviceName) +} + +// needsUpdateDNS check if the dns records of the given service should be updated +func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *api.Service) bool { + if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) { + return false + } + if wantsDNSRecords(oldService) != wantsDNSRecords(newService) { + s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v", + oldService.Spec.Type, newService.Spec.Type) + return true + } + if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { + return true + } + if !LoadBalancerIPsAreEqual(oldService, newService) { + s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v", + oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) + return true + } + if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { + s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v", + len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) + return true + } + for i := range oldService.Spec.ExternalIPs { + if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { + s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v", + newService.Spec.ExternalIPs[i]) + return true + } + } + if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) { + return true + } + if oldService.UID != newService.UID { + s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v", + oldService.UID, newService.UID) + return true + } + + return false +} + +func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { + // TODO: quinton: Probably applies for DNS SVC records. Come back to this. + //var protocol api.Protocol + + ports := []*api.ServicePort{} + for i := range service.Spec.Ports { + sp := &service.Spec.Ports[i] + // The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation + ports = append(ports, sp) + } + return ports, nil +} + +func portsEqualForLB(x, y *api.Service) bool { + xPorts, err := getPortsForLB(x) + if err != nil { + return false + } + yPorts, err := getPortsForLB(y) + if err != nil { + return false + } + return portSlicesEqualForLB(xPorts, yPorts) +} + +func portSlicesEqualForLB(x, y []*api.ServicePort) bool { + if len(x) != len(y) { + return false + } + + for i := range x { + if !portEqualForLB(x[i], y[i]) { + return false + } + } + return true +} + +func portEqualForLB(x, y *api.ServicePort) bool { + // TODO: Should we check name? (In theory, an LB could expose it) + if x.Name != y.Name { + return false + } + + if x.Protocol != y.Protocol { + return false + } + + if x.Port != y.Port { + return false + } + if x.NodePort != y.NodePort { + return false + } + return true +} + +func portEqualExcludeNodePort(x, y *api.ServicePort) bool { + // TODO: Should we check name? (In theory, an LB could expose it) + if x.Name != y.Name { + return false + } + + if x.Protocol != y.Protocol { + return false + } + + if x.Port != y.Port { + return false + } + return true +} + +func clustersFromList(list *federation.ClusterList) []string { + result := []string{} + for ix := range list.Items { + result = append(result, list.Items[ix].Name) + } + return result +} + +// getClusterConditionPredicate filter all clusters meet condition of +// condition.type=Ready and condition.status=true +func getClusterConditionPredicate() federationcache.ClusterConditionPredicate { + return func(cluster federation.Cluster) bool { + // If we have no info, don't accept + if len(cluster.Status.Conditions) == 0 { + return false + } + for _, cond := range cluster.Status.Conditions { + //We consider the cluster for load balancing only when its ClusterReady condition status + //is ConditionTrue + if cond.Type == federation.ClusterReady && cond.Status != api.ConditionTrue { + glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status) + return false + } + } + return true + } +} + +// clusterSyncLoop observes running clusters changes, and apply all services to new added cluster +// and add dns records for the changes +func (s *ServiceController) clusterSyncLoop() { + var servicesToUpdate []*cachedService + // should we remove cache for cluster from ready to not ready? should remove the condition predicate if no + clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List() + if err != nil { + glog.Infof("Fail to get cluster list") + return + } + newClusters := clustersFromList(&clusters) + var newSet, increase sets.String + newSet = sets.NewString(newClusters...) + if newSet.Equal(s.knownClusterSet) { + // The set of cluster names in the services in the federation hasn't changed, but we can retry + // updating any services that we failed to update last time around. + servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters) + return + } + glog.Infof("Detected change in list of cluster names. New set: %v, Old set: %v", newSet, s.knownClusterSet) + increase = newSet.Difference(s.knownClusterSet) + // do nothing when cluster is removed. + if increase != nil { + for newCluster := range increase { + glog.Infof("New cluster observed %s", newCluster) + s.updateAllServicesToCluster(servicesToUpdate, newCluster) + } + // Try updating all services, and save the ones that fail to try again next + // round. + servicesToUpdate = s.serviceCache.allServices() + numServices := len(servicesToUpdate) + servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters) + glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster", + numServices-len(servicesToUpdate), numServices) + } + s.knownClusterSet = newSet +} + +func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) { + cluster, ok := s.clusterCache.clientMap[clusterName] + if ok { + for _, cachedService := range services { + appliedState := cachedService.appliedState + s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset) + } + } +} + +func (s *ServiceController) removeAllServicesFromCluster(services []*cachedService, clusterName string) { + client, ok := s.clusterCache.clientMap[clusterName] + if ok { + for _, cachedService := range services { + s.deleteClusterService(clusterName, cachedService, client.clientset) + } + glog.Infof("Synced all services to cluster %s", clusterName) + } +} + +// updateDNSRecords updates all existing federation service DNS Records so that +// they will match the list of cluster names provided. +// Returns the list of services that couldn't be updated. +func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) { + for _, service := range services { + func() { + service.rwlock.Lock() + defer service.rwlock.Unlock() + // If the applied state is nil, that means it hasn't yet been successfully dealt + // with by the DNS Record reconciler. We can trust the DNS Record + // reconciler to ensure the federation service's DNS records are created to target + // the correct backend service IP's + if service.appliedState == nil { + return + } + if err := s.lockedUpdateDNSRecords(service, clusters); err != nil { + glog.Errorf("External error while updating DNS Records: %v.", err) + servicesToRetry = append(servicesToRetry, service) + } + }() + } + return servicesToRetry +} + +// lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex +// associated with the service. +// TODO: quinton: Still screwed up in the same way as above. Fix. +func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error { + if !wantsDNSRecords(service.appliedState) { + return nil + } + for key := range s.clusterCache.clientMap { + for _, clusterName := range clusterNames { + if key == clusterName { + ensureDNSRecords(clusterName, service) + } + } + } + return nil +} + +func LoadBalancerIPsAreEqual(oldService, newService *api.Service) bool { + return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP +} + +// Computes the next retry, using exponential backoff +// mutex must be held. +func (s *cachedService) nextRetryDelay() time.Duration { + s.lastRetryDelay = s.lastRetryDelay * 2 + if s.lastRetryDelay < minRetryDelay { + s.lastRetryDelay = minRetryDelay + } + if s.lastRetryDelay > maxRetryDelay { + s.lastRetryDelay = maxRetryDelay + } + return s.lastRetryDelay +} + +// resetRetryDelay Resets the retry exponential backoff. mutex must be held. +func (s *cachedService) resetRetryDelay() { + s.lastRetryDelay = time.Duration(0) +} + +// Computes the next retry, using exponential backoff +// mutex must be held. +func (s *cachedService) nextFedUpdateDelay() time.Duration { + s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2 + if s.lastFedUpdateDelay < minRetryDelay { + s.lastFedUpdateDelay = minRetryDelay + } + if s.lastFedUpdateDelay > maxRetryDelay { + s.lastFedUpdateDelay = maxRetryDelay + } + return s.lastFedUpdateDelay +} + +// resetRetryDelay Resets the retry exponential backoff. mutex must be held. +func (s *cachedService) resetFedUpdateDelay() { + s.lastFedUpdateDelay = time.Duration(0) +} + +// Computes the next retry, using exponential backoff +// mutex must be held. +func (s *cachedService) nextDNSUpdateDelay() time.Duration { + s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2 + if s.lastDNSUpdateDelay < minRetryDelay { + s.lastDNSUpdateDelay = minRetryDelay + } + if s.lastDNSUpdateDelay > maxRetryDelay { + s.lastDNSUpdateDelay = maxRetryDelay + } + return s.lastDNSUpdateDelay +} + +// resetRetryDelay Resets the retry exponential backoff. mutex must be held. +func (s *cachedService) resetDNSUpdateDelay() { + s.lastDNSUpdateDelay = time.Duration(0) +} + +// syncService will sync the Service with the given key if it has had its expectations fulfilled, +// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be +// invoked concurrently with the same key. +func (s *ServiceController) syncService(key string) error { + startTime := time.Now() + var cachedService *cachedService + var retryDelay time.Duration + defer func() { + glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) + }() + // obj holds the latest service info from apiserver + obj, exists, err := s.serviceStore.Store.GetByKey(key) + if err != nil { + glog.Infof("Unable to retrieve service %v from store: %v", key, err) + s.queue.Add(key) + return err + } + + if !exists { + // service absence in store means watcher caught the deletion, ensure LB info is cleaned + glog.Infof("Service has been deleted %v", key) + err, retryDelay = s.processServiceDeletion(key) + } + + if exists { + service, ok := obj.(*api.Service) + if ok { + cachedService = s.serviceCache.getOrCreate(key) + err, retryDelay = s.processServiceUpdate(cachedService, service, key) + } else { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj) + } + glog.Infof("Found tombstone for %v", key) + err, retryDelay = s.processServiceDeletion(tombstone.Key) + } + } + + if retryDelay != 0 { + s.enqueueService(obj) + } else if err != nil { + runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err)) + } + return nil +} + +// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration +// indicating whether processing should be retried; zero means no-retry; otherwise +// we should retry in that Duration. +func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) { + // Ensure that no other goroutine will interfere with our processing of the + // service. + cachedService.rwlock.Lock() + defer cachedService.rwlock.Unlock() + + // Update the cached service (used above for populating synthetic deletes) + // alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver + // if the same service is changed before this go routine finished, there will be another queue entry to handle that. + cachedService.lastState = service + err, retry := s.updateFederationService(key, cachedService) + if err != nil { + message := "Error occurs when updating service to all clusters" + if retry { + message += " (will retry): " + } else { + message += " (will not retry): " + } + message += err.Error() + s.eventRecorder.Event(service, api.EventTypeWarning, "UpdateServiceFail", message) + return err, cachedService.nextRetryDelay() + } + // Always update the cache upon success. + // NOTE: Since we update the cached service if and only if we successfully + // processed it, a cached service being nil implies that it hasn't yet + // been successfully processed. + + cachedService.appliedState = service + s.serviceCache.set(key, cachedService) + glog.V(4).Infof("Successfully procceeded services %s", key) + cachedService.resetRetryDelay() + return nil, doNotRetry +} + +// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration +// indicating whether processing should be retried; zero means no-retry; otherwise +// we should retry in that Duration. +func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) { + glog.V(2).Infof("Process service deletion for %v", key) + cachedService, ok := s.serviceCache.get(key) + if !ok { + return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry + } + service := cachedService.lastState + cachedService.rwlock.Lock() + defer cachedService.rwlock.Unlock() + s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records") + // TODO should we delete dns info here or wait for endpoint changes? prefer here + // or we do nothing for service deletion + //err := s.dns.balancer.EnsureLoadBalancerDeleted(service) + err, retry := s.deleteFederationService(cachedService) + if err != nil { + message := "Error occurs when deleting federation service" + if retry { + message += " (will retry): " + } else { + message += " (will not retry): " + } + s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingDNSRecordFailed", message) + return err, cachedService.nextRetryDelay() + } + s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records") + s.serviceCache.delete(key) + + cachedService.resetRetryDelay() + return nil, doNotRetry +} diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go new file mode 100644 index 00000000000..e5db2fb9b6e --- /dev/null +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -0,0 +1,67 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "testing" + + "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/pkg/api" +) + +func TestGetClusterConditionPredicate(t *testing.T) { + tests := []struct { + cluster federation.Cluster + expectAccept bool + name string + }{ + { + cluster: federation.Cluster{}, + expectAccept: false, + name: "empty", + }, + { + cluster: federation.Cluster{ + Status: federation.ClusterStatus{ + Conditions: []federation.ClusterCondition{ + {Type: federation.ClusterReady, Status: api.ConditionTrue}, + }, + }, + }, + expectAccept: true, + name: "basic", + }, + { + cluster: federation.Cluster{ + Status: federation.ClusterStatus{ + Conditions: []federation.ClusterCondition{ + {Type: federation.ClusterReady, Status: api.ConditionFalse}, + }, + }, + }, + expectAccept: false, + name: "notready", + }, + } + pred := getClusterConditionPredicate() + for _, test := range tests { + accept := pred(test.cluster) + if accept != test.expectAccept { + t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept) + } + } +} diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index cf330ad8aa0..a52fcd9bdc4 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -63,6 +63,7 @@ concurrent-deployment-syncs concurrent-endpoint-syncs concurrent-namespace-syncs concurrent-replicaset-syncs +concurrent-service-syncs concurrent-resource-quota-syncs config-sync-period configure-cbr0