diff --git a/federation/pkg/federated-controller/OWNERS b/federation/pkg/federated-controller/OWNERS new file mode 100644 index 00000000000..35859cd8c02 --- /dev/null +++ b/federation/pkg/federated-controller/OWNERS @@ -0,0 +1,5 @@ +assignees: + - bprashanth + - davidopp + - derekwaynecarr + - mikedanese diff --git a/federation/pkg/federated-controller/cluster/cluster_client.go b/federation/pkg/federated-controller/cluster/cluster_client.go new file mode 100644 index 00000000000..2d2ea959ef1 --- /dev/null +++ b/federation/pkg/federated-controller/cluster/cluster_client.go @@ -0,0 +1,87 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "k8s.io/kubernetes/pkg/apis/extensions" + + "k8s.io/kubernetes/federation/apis/federation" + "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/discovery" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "strings" +) + +const ( + UserAgentName = "Cluster-Controller" + KubeAPIQPS = 20.0 + KubeAPIBurst = 30 +) + +type ClusterClient struct { + clientSet clientset.Interface + discoveryClient *discovery.DiscoveryClient +} + +func NewClusterClientSet(c *federation.Cluster) (*ClusterClient, error) { + //TODO:How to get cluster IP(huangyuqi) + var clusterClientSet = ClusterClient{} + clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "") + if err != nil { + return nil, err + } + // clusterConfig.ContentConfig.GroupVersion.Version = "extensions" + clusterConfig.QPS = KubeAPIQPS + clusterConfig.Burst = KubeAPIBurst + clusterClientSet.clientSet = clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName)) + clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName))) + return &clusterClientSet, err +} + +// GetReplicaSetFromCluster get the replicaset from the kubernetes cluster +func (self *ClusterClient) GetReplicaSetFromCluster(subRsName string, subRsNameSpace string) (*extensions.ReplicaSet, error) { + return self.clientSet.Extensions().ReplicaSets(subRsNameSpace).Get(subRsName) +} + +// CreateReplicaSetToCluster create replicaset to the kubernetes cluster +func (self *ClusterClient) CreateReplicaSetToCluster(subRs *extensions.ReplicaSet) (*extensions.ReplicaSet, error) { + return self.clientSet.Extensions().ReplicaSets(subRs.Namespace).Create(subRs) +} + +// UpdateReplicaSetToCluster update replicaset to the kubernetes cluster +func (self *ClusterClient) UpdateReplicaSetToCluster(subRs *extensions.ReplicaSet) (*extensions.ReplicaSet, error) { + return self.clientSet.Extensions().ReplicaSets(subRs.Namespace).Update(subRs) +} + +// DeleteReplicasetFromCluster delete the replicaset from the kubernetes cluster +func (self *ClusterClient) DeleteReplicasetFromCluster(subRs *extensions.ReplicaSet) error { + return self.clientSet.Extensions().ReplicaSets(subRs.Namespace).Delete(subRs.Name, &api.DeleteOptions{}) +} + +// GetClusterHealthStatus get the kubernetes cluster health status +func (self *ClusterClient) GetClusterHealthStatus() federation.ClusterPhase { + body, err := self.discoveryClient.Get().AbsPath("/healthz").Do().Raw() + if err != nil { + return federation.ClusterOffline + } + if !strings.EqualFold(string(body), "ok") { + return federation.ClusterPending + } + return federation.ClusterRunning +} diff --git a/federation/pkg/federated-controller/cluster/clustercontroller.go b/federation/pkg/federated-controller/cluster/clustercontroller.go new file mode 100644 index 00000000000..b37487bb458 --- /dev/null +++ b/federation/pkg/federated-controller/cluster/clustercontroller.go @@ -0,0 +1,403 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "github.com/golang/glog" + "k8s.io/kubernetes/federation/apis/federation" + federationcache "k8s.io/kubernetes/federation/client/cache" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" + "strings" + "time" +) + +const ( + AnnotationKeyOfTargetCluster = "kubernetes.io/target-cluster" + AnnotationKeyOfFederationReplicaSet = "kubernetes.io/created-by" +) + +type ClusterController struct { + knownClusterSet sets.String + + //federationClient used to operate cluster and subrs + federationClient federationclientset.Interface + + //client used to operate rs + client clientset.Interface + + // To allow injection of syncSubRC for testing. + syncHandler func(subRcKey string) error + + //clusterMonitorPeriod is the period for updating status of cluster + clusterMonitorPeriod time.Duration + //clusterKubeClientMap is a mapping of clusterName and restclient + clusterKubeClientMap map[string]ClusterClient + + // subRc framework and store + subReplicaSetController *framework.Controller + subReplicaSetStore federationcache.StoreToSubReplicaSetLister + + // cluster framework and store + clusterController *framework.Controller + clusterStore federationcache.StoreToClusterLister + + // UberRc framework and store + replicaSetStore cache.StoreToReplicationControllerLister + replicaSetController *framework.Controller + + // subRC that have been queued up for processing by workers + queue *workqueue.Type +} + +// NewclusterController returns a new cluster controller +func NewclusterController(client clientset.Interface, federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController { + cc := &ClusterController{ + knownClusterSet: make(sets.String), + federationClient: federationClient, + client: client, + clusterMonitorPeriod: clusterMonitorPeriod, + clusterKubeClientMap: make(map[string]ClusterClient), + queue: workqueue.New(), + } + + cc.subReplicaSetStore.Store, cc.subReplicaSetController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return cc.federationClient.Federation().SubReplicaSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return cc.federationClient.Federation().SubReplicaSets(api.NamespaceAll).Watch(options) + }, + }, + &federation.SubReplicaSet{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + subRc := obj.(*federation.SubReplicaSet) + cc.enqueueSubRc(subRc) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + subRc := newObj.(*federation.SubReplicaSet) + cc.enqueueSubRc(subRc) + }, + }, + ) + + cc.clusterStore.Store, cc.clusterController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return cc.federationClient.Federation().Clusters().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return cc.federationClient.Federation().Clusters().Watch(options) + }, + }, + &federation.Cluster{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{ + DeleteFunc: cc.delFromClusterSet, + AddFunc: cc.addToClusterSet, + }, + ) + + cc.replicaSetStore.Store, cc.replicaSetController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return cc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return cc.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) + }, + }, + &api.ReplicationController{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{ + DeleteFunc: cc.deleteSubRs, + }, + ) + cc.syncHandler = cc.syncSubReplicaSet + return cc +} + +//delFromClusterSet delete a cluster from clusterSet and +//delete the corresponding restclient from the map clusterKubeClientMap +func (cc *ClusterController) delFromClusterSet(obj interface{}) { + cluster := obj.(*federation.Cluster) + cc.knownClusterSet.Delete(cluster.Name) + delete(cc.clusterKubeClientMap, cluster.Name) +} + +//addToClusterSet insert the new cluster to clusterSet and creat a corresponding +//restclient to map clusterKubeClientMap +func (cc *ClusterController) addToClusterSet(obj interface{}) { + cluster := obj.(*federation.Cluster) + cc.knownClusterSet.Insert(cluster.Name) + //create the restclient of cluster + restClient, err := NewClusterClientSet(cluster) + if err != nil { + glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) + } + cc.clusterKubeClientMap[cluster.Name] = *restClient +} + +// Run begins watching and syncing. +func (cc *ClusterController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + go cc.clusterController.Run(wait.NeverStop) + go cc.replicaSetController.Run(wait.NeverStop) + go cc.subReplicaSetController.Run(wait.NeverStop) + // monitor cluster status periodically, in phase 1 we just get the health state from "/healthz" + go wait.Until(func() { + if err := cc.UpdateClusterStatus(); err != nil { + glog.Errorf("Error monitoring cluster status: %v", err) + } + }, cc.clusterMonitorPeriod, wait.NeverStop) + for i := 0; i < workers; i++ { + go wait.Until(cc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down ClusterController") + cc.queue.ShutDown() + +} + +// enqueueSubRc adds an object to the controller work queue +// obj could be an *federation.SubReplicaSet, or a DeletionFinalStateUnknown item. +func (cc *ClusterController) enqueueSubRc(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + cc.queue.Add(key) +} + +func (cc *ClusterController) worker() { + for { + func() { + key, quit := cc.queue.Get() + if quit { + return + } + defer cc.queue.Done(key) + err := cc.syncHandler(key.(string)) + if err != nil { + glog.Errorf("Error syncing cluster controller: %v", err) + } + }() + } +} + +// syncSubReplicaSet will sync the subrc with the given key, +func (cc *ClusterController) syncSubReplicaSet(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) + }() + obj, exists, err := cc.subReplicaSetStore.Store.GetByKey(key) + if !exists { + glog.Infof("sub replicaset: %v has been deleted", key) + return nil + } + if err != nil { + glog.Infof("Unable to retrieve sub replicaset %v from store: %v", key, err) + cc.queue.Add(key) + return err + } + subRs := obj.(*federation.SubReplicaSet) + err = cc.manageSubReplicaSet(subRs) + if err != nil { + glog.Infof("Unable to manage subRs in kubernetes cluster: %v", key, err) + cc.queue.Add(key) + return err + } + return nil +} + +//getBindingClusterOfSubRS get the target cluster(scheduled by federation scheduler) of subRS +//return the targetCluster name +func (cc *ClusterController) getBindingClusterOfSubRS(subRs *federation.SubReplicaSet) (string, error) { + accessor, err := meta.Accessor(subRs) + if err != nil { + return "", err + } + annotations := accessor.GetAnnotations() + if annotations == nil { + return "", fmt.Errorf("Failed to get target cluster from the annotation of subreplicaset") + } + targetCluster, found := annotations[AnnotationKeyOfTargetCluster] + if !found { + return "", fmt.Errorf("Failed to get target cluster from the annotation of subreplicaset") + } + return targetCluster, nil +} + +//getFederateRsCreateBy get the federation ReplicaSet created by of subRS +//return the replica set name +func (cc *ClusterController) getFederateRsCreateBy(subRs *federation.SubReplicaSet) (string, error) { + accessor, err := meta.Accessor(subRs) + if err != nil { + return "", err + } + annotations := accessor.GetAnnotations() + if annotations == nil { + return "", fmt.Errorf("Failed to get Federate Rs Create By from the annotation of subreplicaset") + } + rsCreateBy, found := annotations[AnnotationKeyOfFederationReplicaSet] + if !found { + return "", fmt.Errorf("Failed to get Federate Rs Create By from the annotation of subreplicaset") + } + return rsCreateBy, nil +} + +// manageSubReplicaSet will sync the sub replicaset with the given key,and then create +// or update replicaset to kubernetes cluster +func (cc *ClusterController) manageSubReplicaSet(subRs *federation.SubReplicaSet) error { + + targetClusterName, err := cc.getBindingClusterOfSubRS(subRs) + if targetClusterName == "" || err != nil { + glog.Infof("Failed to get target cluster of SubRS: %v", err) + return err + } + + clusterClient, found := cc.clusterKubeClientMap[targetClusterName] + if !found { + glog.Infof("Failed to get restclient of target cluster") + return fmt.Errorf("Failed to get restclient of target cluster") + } + // check the sub replicaset already exists in kubernetes cluster or not + replicaSet, err := clusterClient.GetReplicaSetFromCluster(subRs.Name, subRs.Namespace) + if err != nil { + glog.Infof("Failed to get RC in kubernetes cluster: %v", err) + return err + } + + rs := extensions.ReplicaSet(*subRs) + //if not exist, means that this sub replicaset need to be created + if replicaSet == nil { + // create the sub replicaset to kubernetes cluster + replicaSet, err := clusterClient.CreateReplicaSetToCluster(&rs) + if err != nil || replicaSet == nil { + glog.Infof("Failed to create sub replicaset in kubernetes cluster: %v", err) + return err + } + } + // if exists, then update it + replicaSet, err = clusterClient.UpdateReplicaSetToCluster(&rs) + if err != nil || replicaSet == nil { + glog.Infof("Failed to update sub replicaset in kubernetes cluster: %v", err) + return err + } + return nil +} + +func (cc *ClusterController) GetClusterStatus(cluster *federation.Cluster) (*federation.ClusterStatus, error) { + // just get the status of cluster, by getting the version of kubernetes cluster api-server + var clusterStatus federation.ClusterStatus + clusterClient, found := cc.clusterKubeClientMap[cluster.Name] + if !found { + glog.Infof("Failed to get restclient of target cluster") + return nil, fmt.Errorf("Failed to get restclient of target cluster") + } + clusterStatus.Phase = clusterClient.GetClusterHealthStatus() + return &clusterStatus, nil +} + +// monitorClusterStatus checks cluster status and get the metrics from cluster's restapi and RC state +func (cc *ClusterController) UpdateClusterStatus() error { + clusters, err := cc.federationClient.Federation().Clusters().List(api.ListOptions{}) + if err != nil { + return err + } + for _, cluster := range clusters.Items { + if !cc.knownClusterSet.Has(cluster.Name) { + glog.V(1).Infof("ClusterController observed a new cluster: %#v", cluster) + cc.knownClusterSet.Insert(cluster.Name) + } + } + + // If there's a difference between lengths of known clusters and observed clusters + // we must have removed some clusters, and evict the subRs belong to the cluster + if len(cc.knownClusterSet) != len(clusters.Items) { + observedSet := make(sets.String) + for _, cluster := range clusters.Items { + observedSet.Insert(cluster.Name) + } + deleted := cc.knownClusterSet.Difference(observedSet) + for clusterName := range deleted { + glog.V(1).Infof("ClusterController observed a Cluster deletion: %v", clusterName) + //TODO: evict the subRS + cc.knownClusterSet.Delete(clusterName) + } + } + for _, cluster := range clusters.Items { + ClusterStatus, err := cc.GetClusterStatus(&cluster) + if err == nil { + continue + } + cluster.Status.Phase = ClusterStatus.Phase + _, err = cc.federationClient.Federation().Clusters().Update(&cluster) + } + return nil +} + +func (cc *ClusterController) deleteSubRs(cur interface{}) { + rs := cur.(*extensions.ReplicaSet) + // get the corresponing subrs from the cache + subRSList, err := cc.federationClient.Federation().SubReplicaSets(api.NamespaceAll).List(api.ListOptions{}) + if err != nil || len(subRSList.Items) == 0 { + glog.Infof("Couldn't get subRS to delete : %+v", cur) + return + } + + // get the related subRS created by the replicaset + for _, subRs := range subRSList.Items { + name, err := cc.getFederateRsCreateBy(&subRs) + if err != nil || !strings.EqualFold(rs.Name, name){ + continue + } + targetClusterName, err := cc.getBindingClusterOfSubRS(&subRs) + if targetClusterName == "" || err != nil { + continue + } + + clusterClient, found := cc.clusterKubeClientMap[targetClusterName] + if !found { + continue + } + rs := extensions.ReplicaSet(subRs) + err = clusterClient.DeleteReplicasetFromCluster(&rs) + if err != nil { + return + } + } + return +} diff --git a/federation/pkg/federated-controller/cluster/clustercontroller_test.go b/federation/pkg/federated-controller/cluster/clustercontroller_test.go new file mode 100644 index 00000000000..7f2b4aa273c --- /dev/null +++ b/federation/pkg/federated-controller/cluster/clustercontroller_test.go @@ -0,0 +1,1180 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// If you make changes to this file, you should also make the corresponding change in ReplicationController. + +package cluster + +import ( + "testing" + + "net/http/httptest" + + "encoding/json" + "fmt" + "k8s.io/kubernetes/federation/apis/federation" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" + // utiltesting "k8s.io/kubernetes/pkg/util/testing" + "net/http" +) + +var alwaysReady = func() bool { return true } + +func getKey(rs *federation.SubReplicaSet, t *testing.T) string { + if key, err := controller.KeyFunc(rs); err != nil { + t.Errorf("Unexpected error getting key for subreplicaset %v: %v", rs.Name, err) + return "" + } else { + return key + } +} + +func newSubReplicaSet(annotationMap map[string]string) *federation.SubReplicaSet { + rs := &federation.SubReplicaSet{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: "fooReplicaSet-123abc", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + Annotations: annotationMap, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 1, + }, + } + return rs +} + +func newCluster(clusterName string, serverUrl string) *federation.Cluster { + cluster := federation.Cluster{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: clusterName, + }, + Spec: federation.ClusterSpec{ + ServerAddressByClientCIDRs: []unversioned.ServerAddressByClientCIDR{ + { + ClientCIDR: "0.0.0.0", + ServerAddress: serverUrl, + }, + }, + }, + } + return &cluster +} + +func newReplicaSet(rsName string) *extensions.ReplicaSet { + rs := &extensions.ReplicaSet{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Extensions.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: rsName, + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 1, + }, + } + return rs +} + +// init a fake http handler, simulate a cluster apiserver, response the "DELETE" "PUT" "GET" "UPDATE" +// when "canBeGotten" is false, means that user can not get the rs from apiserver +// and subRsNameSet simulate the resource store(rs) of apiserver +func createHttptestFakeHandlerForCluster(subRs *federation.SubReplicaSet, subRsNameSet sets.String, canBeGotten bool) *http.HandlerFunc { + subRsString, _ := json.Marshal(*subRs) + fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + subRsNameSet.Delete(subRs.Name) + fmt.Fprintln(w, "") + case "PUT": + subRsNameSet.Insert(subRs.Name) + fmt.Fprintln(w, string(subRsString)) + case "GET": + if canBeGotten { + fmt.Fprintln(w, string(subRsString)) + } else { + fmt.Fprintln(w, "") + } + default: + fmt.Fprintln(w, string(subRsString)) + } + }) + return &fakeHandler +} + +func TestSyncReplicaSetCreate(t *testing.T) { + client := clientset.NewForConfigOrDie( + &restclient.Config{ + Host: "", + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, + }, + ) + federationClientSet := federationclientset.NewForConfigOrDie( + &restclient.Config{ + Host: "", + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Federation.GroupVersion()}, + }, + ) + + manager := NewclusterController(client, federationClientSet, 5) + clusterName := "foobarCluster" + + // subReplicaSet binding to cluster foo and created by replicaset foo + annotationMap := map[string]string{AnnotationKeyOfTargetCluster: clusterName, AnnotationKeyOfFederationReplicaSet: "fooReplicaSet"} + subRs := newSubReplicaSet(annotationMap) + manager.subReplicaSetStore.Store.Add(subRs) + + // create dummy httpserver + subRsNameSet := make(sets.String) + testServer := httptest.NewServer(createHttptestFakeHandlerForCluster(subRs, subRsNameSet, false)) + defer testServer.Close() + + // create the binding cluster for subreplicaset + cluster := newCluster(clusterName, testServer.URL) + manager.knownClusterSet.Insert(cluster.Name) + + // create the clusterclientset for cluster + clusterClientSet, err := NewClusterClientSet(cluster) + if err != nil { + t.Errorf("Failed to create cluster clientset: %v", err) + } + manager.clusterKubeClientMap[cluster.Name] = *clusterClientSet + + // sync subreplicaSet + err = manager.syncSubReplicaSet(getKey(subRs, t)) + if err != nil { + t.Errorf("Failed to syncSubReplicaSet: %v", err) + } + if !subRsNameSet.Has(subRs.Name) { + t.Errorf("Expected rs create in cluster, but not sawn") + } +} + +func TestSyncReplicaSetUpdate(t *testing.T) { + client := clientset.NewForConfigOrDie( + &restclient.Config{ + Host: "", + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, + }, + ) + federationClientSet := federationclientset.NewForConfigOrDie( + &restclient.Config{ + Host: "", + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Federation.GroupVersion()}, + }, + ) + + manager := NewclusterController(client, federationClientSet, 5) + clusterName := "foobarCluster" + + // subReplicaSet binding to cluster foo and created by replicaset foo + annotationMap := map[string]string{AnnotationKeyOfTargetCluster: clusterName, AnnotationKeyOfFederationReplicaSet: "fooReplicaSet"} + subRs := newSubReplicaSet(annotationMap) + manager.subReplicaSetStore.Store.Add(subRs) + + // create dummy httpserver + subRsNameSet := make(sets.String) + testServer := httptest.NewServer(createHttptestFakeHandlerForCluster(subRs, subRsNameSet, true)) + defer testServer.Close() + + // create the binding cluster for subreplicaset + cluster := newCluster(clusterName, testServer.URL) + manager.knownClusterSet.Insert(cluster.Name) + + // create the clusterclientset for cluster + clusterClientSet, err := NewClusterClientSet(cluster) + if err != nil { + t.Errorf("Failed to create cluster clientset: %v", err) + } + manager.clusterKubeClientMap[cluster.Name] = *clusterClientSet + + subRsNameSet.Insert(subRs.Name) + // sync subreplicaSet + err = manager.syncSubReplicaSet(getKey(subRs, t)) + if err != nil { + t.Errorf("Failed to syncSubReplicaSet: %v", err) + } + if !subRsNameSet.Has(subRs.Name) { + t.Errorf("Expected rs create in cluster, but not sawn") + } +} + +/* + +// init a fake http handler, simulate a federation apiserver, response the "DELETE" "PUT" "GET" "UPDATE" +func createHttptestFakeHandlerForFederation(v interface{}) *http.HandlerFunc { + objString, _ := json.Marshal(v) + fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, objString) + }) + return &fakeHandler +} + +func TestSyncReplicaSetDelete(t *testing.T) { + + clusterName := "foobarCluster" + rcName := "fooReplicaSet" + + // subReplicaSet binding to cluster foo and created by replicaset foo + annotationMap := map[string]string{AnnotationKeyOfTargetCluster: clusterName, AnnotationKeyOfFederationReplicaSet: rcName} + subRs := newSubReplicaSet(annotationMap) + + + + testFederationServer := httptest.NewServer(&createHttptestFakeHandlerForFederation(subRs)) + defer testFederationServer.Close() + + client := clientset.NewForConfigOrDie( + &restclient.Config{ + Host: "", + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, + }, + ) + federationClientSet := federationclientset.NewForConfigOrDie( + &restclient.Config{ + Host: testFederationServer.URL, + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Federation.GroupVersion()}, + }, + ) + + manager := NewclusterController(client, federationClientSet, 5) + rs := newReplicaSet(rcName) + manager.replicaSetStore.Store.Delete(rs) + + // create dummy httpserver + subRsNameSet := make(sets.String) + testServer := httptest.NewServer(createHttptestFakeHandlerForCluster(subRs, subRsNameSet, true)) + defer testServer.Close() + + // create the binding cluster for subreplicaset + cluster := newCluster(clusterName, testServer.URL) + manager.knownClusterSet.Insert(cluster.Name) + + // create the clusterclientset for cluster + clusterClientSet, err := NewClusterClientSet(cluster) + if err != nil { + t.Errorf("Failed to create cluster clientset: %v", err) + } + manager.clusterKubeClientMap[cluster.Name] = *clusterClientSet + + subRsNameSet.Insert(subRs.Name) + // sync subreplicaSet + err = manager.syncSubReplicaSet(getKey(subRs, t)) + if err != nil { + t.Errorf("Failed to syncSubReplicaSet: %v", err) + } + if !subRsNameSet.Has(subRs.Name) { + t.Errorf("Expected rs create in cluster, but not sawn") + } +} +*/ +/* +func TestSyncReplicaSetDeletes(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + // 2 running pods and a controller with 1 replica, one pod delete expected + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rsSpec) + newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod") + + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 1) +} + +func TestDeleteFinalStateUnknown(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + received := make(chan string) + manager.syncHandler = func(key string) error { + received <- key + return nil + } + + // The DeletedFinalStateUnknown object should cause the ReplicaSet manager to insert + // the controller matching the selectors of the deleted pod into the work queue. + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rsSpec) + pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod") + manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) + + go manager.worker() + + expected := getKey(rsSpec, t) + select { + case key := <-received: + if key != expected { + t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Processing DeleteFinalStateUnknown took longer than expected") + } +} + +func TestSyncReplicaSetCreates(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + // A controller with 2 replicas and no pods in the store, 2 creates expected + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, 2, 0) +} + +func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { + // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + // Steady state for the ReplicaSet, no Status.Replicas updates expected + activePods := 5 + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(activePods, labelMap) + manager.rsStore.Store.Add(rs) + rs.Status = extensions.ReplicaSetStatus{Replicas: activePods} + newPodList(manager.podStore.Store, activePods, api.PodRunning, labelMap, rs, "pod") + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.syncReplicaSet(getKey(rs, t)) + + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + if fakeHandler.RequestReceived != nil { + t.Errorf("Unexpected update when pods and ReplicaSets are in a steady state") + } + + // This response body is just so we don't err out decoding the http response, all + // we care about is the request body sent below. + response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) + fakeHandler.ResponseBody = response + + rs.Generation = rs.Generation + 1 + manager.syncReplicaSet(getKey(rs, t)) + + rs.Status.ObservedGeneration = rs.Generation + updatedRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs) + fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &updatedRc) +} + +func TestControllerUpdateReplicas(t *testing.T) { + // This is a happy server just to record the PUT request we expect for status.Replicas + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + + client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + // Insufficient number of pods in the system, and Status.Replicas is wrong; + // Status.Replica should update to match number of pods in system, 1 new pod should be created. + labelMap := map[string]string{"foo": "bar"} + extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} + rs := newReplicaSet(5, labelMap) + rs.Spec.Template.Labels = extraLabelMap + manager.rsStore.Store.Add(rs) + rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0} + rs.Generation = 1 + newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podStore.Store, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel") + + // This response body is just so we don't err out decoding the http response + response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) + fakeHandler.ResponseBody = response + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + manager.syncReplicaSet(getKey(rs, t)) + + // 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod. + // 2. Status.FullyLabeledReplicas should equal to the number of pods that + // has the extra labels, i.e., 2. + // 3. Every update to the status should include the Generation of the spec. + rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ObservedGeneration: 1} + + decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs) + fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) +} + +func TestSyncReplicaSetDormancy(t *testing.T) { + // Setup a test server so we can lie about the current state of pods + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rsSpec) + newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rsSpec, "pod") + + // Creates a replica and sets expectations + rsSpec.Status.Replicas = 1 + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) + + // Expectations prevents replicas but not an update on status + rsSpec.Status.Replicas = 0 + fakePodControl.Clear() + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + + // Get the key for the controller + rsKey, err := controller.KeyFunc(rsSpec) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rsSpec, err) + } + + // Lowering expectations should lead to a sync that creates a replica, however the + // fakePodControl error will prevent this, leaving expectations at 0, 0 + manager.expectations.CreationObserved(rsKey) + rsSpec.Status.Replicas = 1 + fakePodControl.Clear() + fakePodControl.Err = fmt.Errorf("Fake Error") + + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + + // This replica should not need a Lowering of expectations, since the previous create failed + fakePodControl.Err = nil + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) + + // 1 PUT for the ReplicaSet status during dormancy window. + // Note that the pod creates go through pod control so they're not recorded. + fakeHandler.ValidateRequestCount(t, 1) +} + +func TestPodControllerLookup(t *testing.T) { + manager := NewReplicaSetController(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + testCases := []struct { + inRSs []*extensions.ReplicaSet + pod *api.Pod + outRSName string + }{ + // pods without labels don't match any ReplicaSets + { + inRSs: []*extensions.ReplicaSet{ + {ObjectMeta: api.ObjectMeta{Name: "basic"}}}, + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}}, + outRSName: "", + }, + // Matching labels, not namespace + { + inRSs: []*extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: extensions.ReplicaSetSpec{ + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + }, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, + outRSName: "", + }, + // Matching ns and labels returns the key to the ReplicaSet, not the ReplicaSet name + { + inRSs: []*extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, + Spec: extensions.ReplicaSetSpec{ + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + }, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, + outRSName: "bar", + }, + } + for _, c := range testCases { + for _, r := range c.inRSs { + manager.rsStore.Add(r) + } + if rs := manager.getPodReplicaSet(c.pod); rs != nil { + if c.outRSName != rs.Name { + t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName) + } + } else if c.outRSName != "" { + t.Errorf("Expected a replica set %v pod %v, found none", c.outRSName, c.pod.Name) + } + } +} + +type FakeWatcher struct { + w *watch.FakeWatcher + *fake.Clientset +} + +func TestWatchControllers(t *testing.T) { + fakeWatch := watch.NewFake() + client := &fake.Clientset{} + client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + var testRSSpec extensions.ReplicaSet + received := make(chan string) + + // The update sent through the fakeWatcher should make its way into the workqueue, + // and eventually into the syncHandler. The handler validates the received controller + // and closes the received channel to indicate that the test can finish. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.rsStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find replica set under key %v", key) + } + rsSpec := *obj.(*extensions.ReplicaSet) + if !api.Semantic.DeepDerivative(rsSpec, testRSSpec) { + t.Errorf("Expected %#v, but got %#v", testRSSpec, rsSpec) + } + close(received) + return nil + } + // Start only the ReplicaSet watcher and the workqueue, send a watch event, + // and make sure it hits the sync method. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.rsController.Run(stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + + testRSSpec.Name = "foo" + fakeWatch.Add(&testRSSpec) + + select { + case <-received: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected 1 call but got 0") + } +} + +func TestWatchPods(t *testing.T) { + fakeWatch := watch.NewFake() + client := &fake.Clientset{} + client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + // Put one ReplicaSet and one pod into the controller's stores + labelMap := map[string]string{"foo": "bar"} + testRSSpec := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(testRSSpec) + received := make(chan string) + // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and + // send it into the syncHandler. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.rsStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find replica set under key %v", key) + } + rsSpec := obj.(*extensions.ReplicaSet) + if !api.Semantic.DeepDerivative(rsSpec, testRSSpec) { + t.Errorf("\nExpected %#v,\nbut got %#v", testRSSpec, rsSpec) + } + close(received) + return nil + } + // Start only the pod watcher and the workqueue, send a watch event, + // and make sure it hits the sync method for the right ReplicaSet. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.podController.Run(stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + + pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod") + testPod := pods.Items[0] + testPod.Status.Phase = api.PodFailed + fakeWatch.Add(&testPod) + + select { + case <-received: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected 1 call but got 0") + } +} + +func TestUpdatePods(t *testing.T) { + manager := NewReplicaSetController(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + received := make(chan string) + + manager.syncHandler = func(key string) error { + obj, exists, err := manager.rsStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find replica set under key %v", key) + } + received <- obj.(*extensions.ReplicaSet).Name + return nil + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + + // Put 2 ReplicaSets and one pod into the controller's stores + labelMap1 := map[string]string{"foo": "bar"} + testRSSpec1 := newReplicaSet(1, labelMap1) + manager.rsStore.Store.Add(testRSSpec1) + testRSSpec2 := *testRSSpec1 + labelMap2 := map[string]string{"bar": "foo"} + testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2} + testRSSpec2.Name = "barfoo" + manager.rsStore.Store.Add(&testRSSpec2) + + // Put one pod in the podStore + pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod2 := pod1 + pod2.Labels = labelMap2 + + // Send an update of the same pod with modified labels, and confirm we get a sync request for + // both controllers + manager.updatePod(&pod1, &pod2) + + expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for replica sets within 100ms each") + } + } +} + +func TestControllerUpdateRequeue(t *testing.T) { + // This server should force a requeue of the controller because it fails to update status.Replicas. + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + + client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.podStoreSynced = alwaysReady + + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rs) + rs.Status = extensions.ReplicaSetStatus{Replicas: 2} + newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rs, "pod") + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + manager.syncReplicaSet(getKey(rs, t)) + + ch := make(chan interface{}) + go func() { + item, _ := manager.queue.Get() + ch <- item + }() + select { + case key := <-ch: + expectedKey := getKey(rs, t) + if key != expectedKey { + t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key) + } + case <-time.After(wait.ForeverTestTimeout): + manager.queue.ShutDown() + t.Errorf("Expected to find a ReplicaSet in the queue, found none.") + } + // 1 Update and 1 GET, both of which fail + fakeHandler.ValidateRequestCount(t, 2) +} + +func TestControllerUpdateStatusWithFailure(t *testing.T) { + rs := newReplicaSet(1, map[string]string{"foo": "bar"}) + fakeClient := &fake.Clientset{} + fakeClient.AddReactor("get", "replicasets", func(action core.Action) (bool, runtime.Object, error) { return true, rs, nil }) + fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + return true, &extensions.ReplicaSet{}, fmt.Errorf("Fake error") + }) + fakeRSClient := fakeClient.Extensions().ReplicaSets("default") + numReplicas := 10 + updateReplicaCount(fakeRSClient, *rs, numReplicas, 0) + updates, gets := 0, 0 + for _, a := range fakeClient.Actions() { + if a.GetResource() != "replicasets" { + t.Errorf("Unexpected action %+v", a) + continue + } + + switch action := a.(type) { + case testclient.GetAction: + gets++ + // Make sure the get is for the right ReplicaSet even though the update failed. + if action.GetName() != rs.Name { + t.Errorf("Expected get for ReplicaSet %v, got %+v instead", rs.Name, action.GetName()) + } + case testclient.UpdateAction: + updates++ + // Confirm that the update has the right status.Replicas even though the Get + // returned a ReplicaSet with replicas=1. + if c, ok := action.GetObject().(*extensions.ReplicaSet); !ok { + t.Errorf("Expected a ReplicaSet as the argument to update, got %T", c) + } else if c.Status.Replicas != numReplicas { + t.Errorf("Expected update for ReplicaSet to contain replicas %v, got %v instead", + numReplicas, c.Status.Replicas) + } + default: + t.Errorf("Unexpected action %+v", a) + break + } + } + if gets != 1 || updates != 2 { + t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates) + } +} + +// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. +func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, burstReplicas, 0) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(numReplicas, labelMap) + manager.rsStore.Store.Add(rsSpec) + + expectedPods := 0 + pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod") + + rsKey, err := controller.KeyFunc(rsSpec) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rsSpec, err) + } + + // Size up the controller, then size it down, and confirm the expected create/delete pattern + for _, replicas := range []int{numReplicas, 0} { + + rsSpec.Spec.Replicas = replicas + manager.rsStore.Store.Add(rsSpec) + + for i := 0; i < numReplicas; i += burstReplicas { + manager.syncReplicaSet(getKey(rsSpec, t)) + + // The store accrues active pods. It's also used by the ReplicaSet to determine how many + // replicas to create. + activePods := len(manager.podStore.Store.List()) + if replicas != 0 { + // This is the number of pods currently "in flight". They were created by the + // ReplicaSet controller above, which then puts the ReplicaSet to sleep till + // all of them have been observed. + expectedPods = replicas - activePods + if expectedPods > burstReplicas { + expectedPods = burstReplicas + } + // This validates the ReplicaSet manager sync actually created pods + validateSyncReplicaSet(t, &fakePodControl, expectedPods, 0) + + // This simulates the watch events for all but 1 of the expected pods. + // None of these should wake the controller because it has expectations==BurstReplicas. + for i := 0; i < expectedPods-1; i++ { + manager.podStore.Store.Add(&pods.Items[i]) + manager.addPod(&pods.Items[i]) + } + + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil { + t.Fatalf("Did not find expectations for rc.") + } + if add, _ := podExp.GetExpectations(); add != 1 { + t.Fatalf("Expectations are wrong %v", podExp) + } + } else { + expectedPods = (replicas - activePods) * -1 + if expectedPods > burstReplicas { + expectedPods = burstReplicas + } + validateSyncReplicaSet(t, &fakePodControl, 0, expectedPods) + + // To accurately simulate a watch we must delete the exact pods + // the rs is waiting for. + expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t)) + podsToDelete := []*api.Pod{} + for _, key := range expectedDels.List() { + nsName := strings.Split(key, "/") + podsToDelete = append(podsToDelete, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: nsName[1], + Namespace: nsName[0], + Labels: rsSpec.Spec.Selector.MatchLabels, + }, + }) + } + // Don't delete all pods because we confirm that the last pod + // has exactly one expectation at the end, to verify that we + // don't double delete. + for i := range podsToDelete[1:] { + manager.podStore.Delete(podsToDelete[i]) + manager.deletePod(podsToDelete[i]) + } + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil { + t.Fatalf("Did not find expectations for ReplicaSet.") + } + if _, del := podExp.GetExpectations(); del != 1 { + t.Fatalf("Expectations are wrong %v", podExp) + } + } + + // Check that the ReplicaSet didn't take any action for all the above pods + fakePodControl.Clear() + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + + // Create/Delete the last pod + // The last add pod will decrease the expectation of the ReplicaSet to 0, + // which will cause it to create/delete the remaining replicas up to burstReplicas. + if replicas != 0 { + manager.podStore.Store.Add(&pods.Items[expectedPods-1]) + manager.addPod(&pods.Items[expectedPods-1]) + } else { + expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t)) + if expectedDel.Len() != 1 { + t.Fatalf("Waiting on unexpected number of deletes.") + } + nsName := strings.Split(expectedDel.List()[0], "/") + lastPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: nsName[1], + Namespace: nsName[0], + Labels: rsSpec.Spec.Selector.MatchLabels, + }, + } + manager.podStore.Store.Delete(lastPod) + manager.deletePod(lastPod) + } + pods.Items = pods.Items[expectedPods:] + } + + // Confirm that we've created the right number of replicas + activePods := len(manager.podStore.Store.List()) + if activePods != rsSpec.Spec.Replicas { + t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods) + } + // Replenish the pod list, since we cut it down sizing up + pods = newPodList(nil, replicas, api.PodRunning, labelMap, rsSpec, "pod") + } +} + +func TestControllerBurstReplicas(t *testing.T) { + doTestControllerBurstReplicas(t, 5, 30) + doTestControllerBurstReplicas(t, 5, 12) + doTestControllerBurstReplicas(t, 3, 2) +} + +type FakeRSExpectations struct { + *controller.ControllerExpectations + satisfied bool + expSatisfied func() +} + +func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { + fe.expSatisfied() + return fe.satisfied +} + +// TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods +// and checking expectations. +func TestRSSyncExpectations(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2, 0) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rsSpec) + pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod") + manager.podStore.Store.Add(&pods.Items[0]) + postExpectationsPod := pods.Items[1] + + manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{ + controller.NewControllerExpectations(), true, func() { + // If we check active pods before checking expectataions, the + // ReplicaSet will create a new replica because it doesn't see + // this pod, but has fulfilled its expectations. + manager.podStore.Store.Add(&postExpectationsPod) + }, + }) + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) +} + +func TestDeleteControllerAndExpectations(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10, 0) + manager.podStoreSynced = alwaysReady + + rs := newReplicaSet(1, map[string]string{"foo": "bar"}) + manager.rsStore.Store.Add(rs) + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + // This should set expectations for the ReplicaSet + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) + fakePodControl.Clear() + + // Get the ReplicaSet key + rsKey, err := controller.KeyFunc(rs) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rs, err) + } + + // This is to simulate a concurrent addPod, that has a handle on the expectations + // as the controller deletes it. + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil { + t.Errorf("No expectations found for ReplicaSet") + } + manager.rsStore.Delete(rs) + manager.syncReplicaSet(getKey(rs, t)) + + if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { + t.Errorf("Found expectaions, expected none since the ReplicaSet has been deleted.") + } + + // This should have no effect, since we've deleted the ReplicaSet. + podExp.Add(-1, 0) + manager.podStore.Store.Replace(make([]interface{}, 0), "0") + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) +} + +func TestRSManagerNotReady(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2, 0) + manager.podControl = &fakePodControl + manager.podStoreSynced = func() bool { return false } + + // Simulates the ReplicaSet reflector running before the pod reflector. We don't + // want to end up creating replicas in this case until the pod reflector + // has synced, so the ReplicaSet controller should just requeue the ReplicaSet. + rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"}) + manager.rsStore.Store.Add(rsSpec) + + rsKey := getKey(rsSpec, t) + manager.syncReplicaSet(rsKey) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + + manager.podStoreSynced = alwaysReady + manager.syncReplicaSet(rsKey) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) +} + +// shuffle returns a new shuffled list of container controllers. +func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet { + numControllers := len(controllers) + randIndexes := rand.Perm(numControllers) + shuffled := make([]*extensions.ReplicaSet, numControllers) + for i := 0; i < numControllers; i++ { + shuffled[i] = controllers[randIndexes[i]] + } + return shuffled +} + +func TestOverlappingRSs(t *testing.T) { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + labelMap := map[string]string{"foo": "bar"} + + for i := 0; i < 5; i++ { + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10, 0) + manager.podStoreSynced = alwaysReady + + // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store + var controllers []*extensions.ReplicaSet + for j := 1; j < 10; j++ { + rsSpec := newReplicaSet(1, labelMap) + rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) + rsSpec.Name = string(util.NewUUID()) + controllers = append(controllers, rsSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + manager.rsStore.Store.Add(shuffledControllers[j]) + } + // Add a pod and make sure only the oldest ReplicaSet is synced + pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod") + rsKey := getKey(controllers[0], t) + + manager.addPod(&pods.Items[0]) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + } +} + +func TestDeletionTimestamp(t *testing.T) { + c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + labelMap := map[string]string{"foo": "bar"} + manager := NewReplicaSetController(c, controller.NoResyncPeriodFunc, 10, 0) + manager.podStoreSynced = alwaysReady + + rs := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rs) + rsKey, err := controller.KeyFunc(rs) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rs, err) + } + pod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0] + pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()} + manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) + + // A pod added with a deletion timestamp should decrement deletions, not creations. + manager.addPod(&pod) + + queueRC, _ := manager.queue.Get() + if queueRC != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + } + manager.queue.Done(rsKey) + + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil || !podExp.Fulfilled() { + t.Fatalf("Wrong expectations %+v", podExp) + } + + // An update from no deletion timestamp to having one should be treated + // as a deletion. + oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0] + manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) + manager.updatePod(&oldPod, &pod) + + queueRC, _ = manager.queue.Get() + if queueRC != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + } + manager.queue.Done(rsKey) + + podExp, exists, err = manager.expectations.GetExpectations(rsKey) + if !exists || err != nil || !podExp.Fulfilled() { + t.Fatalf("Wrong expectations %+v", podExp) + } + + // An update to the pod (including an update to the deletion timestamp) + // should not be counted as a second delete. + secondPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: pod.Namespace, + Name: "secondPod", + Labels: pod.Labels, + }, + } + manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) + oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()} + manager.updatePod(&oldPod, &pod) + + podExp, exists, err = manager.expectations.GetExpectations(rsKey) + if !exists || err != nil || podExp.Fulfilled() { + t.Fatalf("Wrong expectations %+v", podExp) + } + + // A pod with a non-nil deletion timestamp should also be ignored by the + // delete handler, because it's already been counted in the update. + manager.deletePod(&pod) + podExp, exists, err = manager.expectations.GetExpectations(rsKey) + if !exists || err != nil || podExp.Fulfilled() { + t.Fatalf("Wrong expectations %+v", podExp) + } + + // Deleting the second pod should clear expectations. + manager.deletePod(secondPod) + + queueRC, _ = manager.queue.Get() + if queueRC != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + } + manager.queue.Done(rsKey) + + podExp, exists, err = manager.expectations.GetExpectations(rsKey) + if !exists || err != nil || !podExp.Fulfilled() { + t.Fatalf("Wrong expectations %+v", podExp) + } +} +*/ diff --git a/federation/pkg/federated-controller/cluster/doc.go b/federation/pkg/federated-controller/cluster/doc.go new file mode 100644 index 00000000000..b4fef97c5b0 --- /dev/null +++ b/federation/pkg/federated-controller/cluster/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cluster contains code for syncing cloud instances with +// node registry +package cluster diff --git a/federation/pkg/federated-controller/doc.go b/federation/pkg/federated-controller/doc.go new file mode 100644 index 00000000000..1e310b466f5 --- /dev/null +++ b/federation/pkg/federated-controller/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controller contains code for controllers (like the replication +// controller). +package controller diff --git a/federation/pkg/federated-controller/lookup_cache.go b/federation/pkg/federated-controller/lookup_cache.go new file mode 100644 index 00000000000..5d82908be01 --- /dev/null +++ b/federation/pkg/federated-controller/lookup_cache.go @@ -0,0 +1,90 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "hash/adler32" + "sync" + + "github.com/golang/groupcache/lru" + "k8s.io/kubernetes/pkg/api/meta" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +type objectWithMeta interface { + meta.Object +} + +// keyFunc returns the key of an object, which is used to look up in the cache for it's matching object. +// Since we match objects by namespace and Labels/Selector, so if two objects have the same namespace and labels, +// they will have the same key. +func keyFunc(obj objectWithMeta) uint64 { + hash := adler32.New() + hashutil.DeepHashObject(hash, &equivalenceLabelObj{ + namespace: obj.GetNamespace(), + labels: obj.GetLabels(), + }) + return uint64(hash.Sum32()) +} + +type equivalenceLabelObj struct { + namespace string + labels map[string]string +} + +// MatchingCache save label and selector matching relationship +type MatchingCache struct { + mutex sync.RWMutex + cache *lru.Cache +} + +// NewMatchingCache return a NewMatchingCache, which save label and selector matching relationship. +func NewMatchingCache(maxCacheEntries int) *MatchingCache { + return &MatchingCache{ + cache: lru.New(maxCacheEntries), + } +} + +// Add will add matching information to the cache. +func (c *MatchingCache) Add(labelObj objectWithMeta, selectorObj objectWithMeta) { + key := keyFunc(labelObj) + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(key, selectorObj) +} + +// GetMatchingObject lookup the matching object for a given object. +// Note: the cache information may be invalid since the controller may be deleted or updated, +// we need check in the external request to ensure the cache data is not dirty. +func (c *MatchingCache) GetMatchingObject(labelObj objectWithMeta) (controller interface{}, exists bool) { + key := keyFunc(labelObj) + c.mutex.Lock() + defer c.mutex.Unlock() + return c.cache.Get(key) +} + +// Update update the cached matching information. +func (c *MatchingCache) Update(labelObj objectWithMeta, selectorObj objectWithMeta) { + c.Add(labelObj, selectorObj) +} + +// InvalidateAll invalidate the whole cache. +func (c *MatchingCache) InvalidateAll() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache = lru.New(c.cache.MaxEntries) +}