diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 74a58d0a67e..6612330f8a2 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" + replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" "k8s.io/kubernetes/federation/pkg/federation-controller/util" @@ -155,5 +156,9 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset) secretcontroller.Run(wait.NeverStop) + replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName)) + replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) + go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) + select {} } diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go index 8e3a64ef778..091497110b1 100644 --- a/federation/cmd/federation-controller-manager/app/options/options.go +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -47,6 +47,10 @@ type ControllerManagerConfiguration struct { // allowed to sync concurrently. Larger number = more responsive service // management, but more CPU (and network) load. ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"` + // concurrentReplicaSetSyncs is the number of ReplicaSets that are + // allowed to sync concurrently. Larger number = more responsive service + // management, but more CPU (and network) load. + ConcurrentReplicaSetSyncs int `json:"concurrentReplicaSetSyncs"` // clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` // APIServerQPS is the QPS to use while talking with federation apiserver. @@ -78,13 +82,14 @@ const ( func NewCMServer() *CMServer { s := CMServer{ ControllerManagerConfiguration: ControllerManagerConfiguration{ - Port: FederatedControllerManagerPort, - Address: "0.0.0.0", - ConcurrentServiceSyncs: 10, - ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, - APIServerQPS: 20.0, - APIServerBurst: 30, - LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), + Port: FederatedControllerManagerPort, + Address: "0.0.0.0", + ConcurrentServiceSyncs: 10, + ConcurrentReplicaSetSyncs: 10, + ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, + APIServerQPS: 20.0, + APIServerBurst: 30, + LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), }, } return &s @@ -97,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.") fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.") fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentReplicaSetSyncs, "concurrent-replicaset-syncs", s.ConcurrentReplicaSetSyncs, "The number of ReplicaSets syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go new file mode 100644 index 00000000000..9efdf42f6c7 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -0,0 +1,449 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "encoding/json" + "reflect" + "time" + + "github.com/golang/glog" + + fed "k8s.io/kubernetes/federation/apis/federation" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/cache" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + FedReplicaSetPreferencesAnnotation = "" + allClustersKey = "THE_ALL_CLUSTER_KEY" + UserAgentName = "Federation-replicaset-Controller" +) + +var ( + replicaSetReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second + allReplicaSetReviewDealy = 2 * time.Minute +) + +func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { + accessor, err := meta.Accessor(frs) + if err != nil { + return nil, err + } + anno := accessor.GetAnnotations() + if anno == nil { + return nil, nil + } + frsPrefString, found := anno[FedReplicaSetPreferencesAnnotation] + if !found { + return nil, nil + } + var frsPref fed.FederatedReplicaSetPreferences + if err := json.Unmarshal([]byte(frsPrefString), &frsPref); err != nil { + return nil, err + } + return &frsPref, nil +} + +type ReplicaSetController struct { + fedClient fedclientset.Interface + + replicaSetController *framework.Controller + replicaSetStore cache.StoreToReplicaSetLister + + fedReplicaSetInformer fedutil.FederatedInformer + fedPodInformer fedutil.FederatedInformer + + replicasetDeliverer *fedutil.DelayingDeliverer + clusterDeliverer *fedutil.DelayingDeliverer + replicasetWorkQueue workqueue.Interface + + replicaSetBackoff *flowcontrol.Backoff + + defaultPlanner *planner.Planner +} + +// NewclusterController returns a new cluster controller +func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSetController { + frsc := &ReplicaSetController{ + fedClient: federationClient, + replicasetDeliverer: fedutil.NewDelayingDeliverer(), + clusterDeliverer: fedutil.NewDelayingDeliverer(), + replicasetWorkQueue: workqueue.New(), + replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ + Clusters: map[string]fed.ClusterReplicaSetPreferences{ + "*": {Weight: 1}, + }, + }), + } + + replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options) + }, + }, + &extensionsv1.ReplicaSet{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, allReplicaSetReviewDealy) }, + ), + ) + } + clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *fedv1.Cluster) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + } + frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + + podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Core().Pods(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Core().Pods(apiv1.NamespaceAll).Watch(options) + }, + }, + &apiv1.Pod{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + ), + ) + } + frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) + + frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options) + }, + }, + &extensionsv1.ReplicaSet{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndSpecChanges( + func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }, + ), + ) + + return frsc +} + +func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { + go frsc.replicaSetController.Run(stopCh) + frsc.fedReplicaSetInformer.Start() + frsc.fedPodInformer.Start() + + frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { + frsc.replicasetWorkQueue.Add(item.Key) + }) + frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { + frsc.reconcileReplicaSetsOnClusterChange() + }) + + for !frsc.isSynced() { + time.Sleep(5 * time.Millisecond) + } + + for i := 0; i < workers; i++ { + go wait.Until(frsc.worker, time.Second, stopCh) + } + + go func() { + select { + case <-time.After(time.Minute): + frsc.replicaSetBackoff.GC() + case <-stopCh: + return + } + }() + + <-stopCh + glog.Infof("Shutting down ReplicaSetController") + frsc.replicasetDeliverer.Stop() + frsc.clusterDeliverer.Stop() + frsc.replicasetWorkQueue.ShutDown() + frsc.fedReplicaSetInformer.Stop() + frsc.fedPodInformer.Stop() +} + +func (frsc *ReplicaSetController) isSynced() bool { + if !frsc.fedReplicaSetInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + + if !frsc.fedPodInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + + if !frsc.replicaSetController.HasSynced() { + glog.V(2).Infof("federation replicaset list not synced") + return false + } + return true +} + +func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, duration time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %v: %v", obj, err) + return + } + _, exists, err := frsc.replicaSetStore.Store.GetByKey(key) + if err != nil { + glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) + return + } + if exists { // ignore replicasets exists only in local k8s + frsc.deliverReplicaSetByKey(key, duration, false) + } +} + +func (frsc *ReplicaSetController) deliverFedReplicaSetObj(obj interface{}, delay time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + frsc.deliverReplicaSetByKey(key, delay, false) +} + +func (frsc *ReplicaSetController) deliverReplicaSetByKey(key string, delay time.Duration, failed bool) { + if failed { + frsc.replicaSetBackoff.Next(key, time.Now()) + delay = delay + frsc.replicaSetBackoff.Get(key) + } else { + frsc.replicaSetBackoff.Reset(key) + } + frsc.replicasetDeliverer.DeliverAfter(key, nil, delay) +} + +func (frsc *ReplicaSetController) worker() { + for { + item, quit := frsc.replicasetWorkQueue.Get() + if quit { + return + } + key := item.(string) + err := frsc.reconcileReplicaSet(key) + frsc.replicasetWorkQueue.Done(item) + if err != nil { + glog.Errorf("Error syncing cluster controller: %v", err) + frsc.deliverReplicaSetByKey(key, 0, true) + } + } +} + +func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster, + current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { + // TODO: integrate real scheduler + + plnr := frsc.defaultPlanner + frsPref, err := parseFederationReplicaSetReference(frs) + if err != nil { + glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err) + } + if frsPref != nil { // create a new planner if user specified a preference + plnr = planner.NewPlanner(frsPref) + } + + replicas := int64(*frs.Spec.Replicas) + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity) + // make sure the return contains clusters need to zero the replicas + result := make(map[string]int64) + for clusterName := range current { + result[clusterName] = 0 + } + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } + return result +} + +func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { + if !frsc.isSynced() { + frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false) + return nil + } + + glog.Infof("Start reconcile replicaset %q", key) + startTime := time.Now() + defer glog.Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) + + obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key) + if err != nil { + return err + } + if !exists { + // don't delete local replicasets for now + return nil + } + frs := obj.(*extensionsv1.ReplicaSet) + + clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() + if err != nil { + return err + } + + // collect current status and do schedule + allPods, err := frsc.fedPodInformer.GetTargetStore().List() + if err != nil { + return err + } + podStatus, err := AnalysePods(frs, allPods, time.Now()) + current := make(map[string]int64) + estimatedCapacity := make(map[string]int64) + for _, cluster := range clusters { + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + return err + } + if exists { + lrs := lrsObj.(*extensionsv1.ReplicaSet) + current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well? + unschedulable := int64(podStatus[cluster.Name].Unschedulable) + if unschedulable > 0 { + estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable + } + } + } + + scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity) + + glog.Infof("Start syncing local replicaset %v", scheduleResult) + + fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} + for clusterName, replicas := range scheduleResult { + // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status + clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName) + if err != nil { + return err + } + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key) + if err != nil { + return err + } else if !exists { + if replicas > 0 { + lrs := &extensionsv1.ReplicaSet{ + ObjectMeta: apiv1.ObjectMeta{ + Name: frs.Name, + Namespace: frs.Namespace, + Labels: frs.Labels, + Annotations: frs.Annotations, + }, + Spec: frs.Spec, + } + specReplicas := int32(replicas) + lrs.Spec.Replicas = &specReplicas + lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs) + if err != nil { + return err + } + fedStatus.Replicas += lrs.Status.Replicas + fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + } + } else { + lrs := lrsObj.(*extensionsv1.ReplicaSet) + lrsExpectedSpec := frs.Spec + specReplicas := int32(replicas) + lrsExpectedSpec.Replicas = &specReplicas + if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) { + lrs.Spec = lrsExpectedSpec + lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs) + if err != nil { + return err + } + } + fedStatus.Replicas += lrs.Status.Replicas + fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + // leave the replicaset even the replicas dropped to 0 + } + } + if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas { + frs.Status = fedStatus + _, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs) + if err != nil { + return err + } + } + + return nil +} + +func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() { + if !frsc.isSynced() { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + } + rss := frsc.replicaSetStore.Store.List() + for _, rs := range rss { + key, _ := controller.KeyFunc(rs) + frsc.deliverReplicaSetByKey(key, 0, false) + } +} diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go new file mode 100644 index 00000000000..3a194a40d19 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -0,0 +1,190 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "flag" + "fmt" + "github.com/stretchr/testify/assert" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api/meta" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/watch" + "testing" + "time" +) + +func TestParseFederationReplicaSetReference(t *testing.T) { + successPrefs := []string{ + `{"rebalance": true, + "clusters": { + "k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2}, + "*": {"weight": 1} + }}`, + } + failedPrefes := []string{ + `{`, // bad json + } + + rs := newReplicaSetWithReplicas("rs-1", 100) + accessor, _ := meta.Accessor(rs) + anno := accessor.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + accessor.SetAnnotations(anno) + } + for _, prefString := range successPrefs { + anno[FedReplicaSetPreferencesAnnotation] = prefString + pref, err := parseFederationReplicaSetReference(rs) + assert.NotNil(t, pref) + assert.Nil(t, err) + } + for _, prefString := range failedPrefes { + anno[FedReplicaSetPreferencesAnnotation] = prefString + pref, err := parseFederationReplicaSetReference(rs) + assert.Nil(t, pref) + assert.NotNil(t, err) + } +} + +func TestReplicaSetController(t *testing.T) { + flag.Set("logtostderr", "true") + flag.Set("v", "5") + flag.Parse() + + replicaSetReviewDelay = 10 * time.Millisecond + clusterAvailableDelay = 20 * time.Millisecond + clusterUnavailableDelay = 60 * time.Millisecond + allReplicaSetReviewDealy = 120 * time.Millisecond + + fedclientset := fedclientfake.NewSimpleClientset() + fedrswatch := watch.NewFake() + fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil)) + + fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-1", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-2", apiv1.ConditionTrue)) + + kube1clientset := kubeclientfake.NewSimpleClientset() + kube1rswatch := watch.NewFake() + kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil)) + kube1Podwatch := watch.NewFake() + kube1clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube1Podwatch, nil)) + kube2clientset := kubeclientfake.NewSimpleClientset() + kube2rswatch := watch.NewFake() + kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil)) + kube2Podwatch := watch.NewFake() + kube2clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube2Podwatch, nil)) + + fedInformerClientFactory := func(cluster *fedv1.Cluster) (kube_release_1_4.Interface, error) { + switch cluster.Name { + case "k8s-1": + return kube1clientset, nil + case "k8s-2": + return kube2clientset, nil + default: + return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) + } + } + replicaSetController := NewReplicaSetController(fedclientset) + rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer) + rsFedinformer.SetClientFactory(fedInformerClientFactory) + podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer) + podFedinformer.SetClientFactory(fedInformerClientFactory) + + stopChan := make(chan struct{}) + defer close(stopChan) + go replicaSetController.Run(1, stopChan) + + rs := newReplicaSetWithReplicas("rs", 9) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs) + fedrswatch.Add(rs) + time.Sleep(1 * time.Second) + + rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + kube1rswatch.Add(rs1) + rs1.Status.Replicas = *rs1.Spec.Replicas + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) + kube1rswatch.Modify(rs1) + + rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + kube2rswatch.Add(rs2) + rs2.Status.Replicas = *rs2.Spec.Replicas + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) + kube2rswatch.Modify(rs2) + + time.Sleep(1 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) + assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) + + var replicas int32 = 20 + rs.Spec.Replicas = &replicas + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs) + fedrswatch.Modify(rs) + time.Sleep(1 * time.Second) + + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1.Status.Replicas = *rs1.Spec.Replicas + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) + kube1rswatch.Modify(rs1) + + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2.Status.Replicas = *rs2.Spec.Replicas + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) + kube2rswatch.Modify(rs2) + + time.Sleep(1 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) + assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) +} + +func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(fedutil.FederatedInformerForTestOnly) +} + +func newClusterWithReadyStatus(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { + return &fedv1.Cluster{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + }, + Status: fedv1.ClusterStatus{ + Conditions: []fedv1.ClusterCondition{ + {Type: fedv1.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet { + return &extensionsv1.ReplicaSet{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + Namespace: apiv1.NamespaceDefault, + }, + Spec: extensionsv1.ReplicaSetSpec{ + Replicas: &replicas, + }, + } +}