diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 74a58d0a67e..6612330f8a2 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" + replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" "k8s.io/kubernetes/federation/pkg/federation-controller/util" @@ -155,5 +156,9 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset) secretcontroller.Run(wait.NeverStop) + replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName)) + replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) + go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) + select {} } diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go index 8e3a64ef778..091497110b1 100644 --- a/federation/cmd/federation-controller-manager/app/options/options.go +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -47,6 +47,10 @@ type ControllerManagerConfiguration struct { // allowed to sync concurrently. Larger number = more responsive service // management, but more CPU (and network) load. ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"` + // concurrentReplicaSetSyncs is the number of ReplicaSets that are + // allowed to sync concurrently. Larger number = more responsive service + // management, but more CPU (and network) load. + ConcurrentReplicaSetSyncs int `json:"concurrentReplicaSetSyncs"` // clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` // APIServerQPS is the QPS to use while talking with federation apiserver. @@ -78,13 +82,14 @@ const ( func NewCMServer() *CMServer { s := CMServer{ ControllerManagerConfiguration: ControllerManagerConfiguration{ - Port: FederatedControllerManagerPort, - Address: "0.0.0.0", - ConcurrentServiceSyncs: 10, - ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, - APIServerQPS: 20.0, - APIServerBurst: 30, - LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), + Port: FederatedControllerManagerPort, + Address: "0.0.0.0", + ConcurrentServiceSyncs: 10, + ConcurrentReplicaSetSyncs: 10, + ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, + APIServerQPS: 20.0, + APIServerBurst: 30, + LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), }, } return &s @@ -97,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.") fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.") fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentReplicaSetSyncs, "concurrent-replicaset-syncs", s.ConcurrentReplicaSetSyncs, "The number of ReplicaSets syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go new file mode 100644 index 00000000000..39ce4753cc2 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -0,0 +1,380 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "reflect" + "time" + + "github.com/golang/glog" + + fed "k8s.io/kubernetes/federation/apis/federation" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + //kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + // schedule result was put into annotation in a format of "clusterName:replicas[/clusterName:replicas]..." + ExpectedReplicasAnnotation = "kubernetes.io/expected-replicas" + allClustersKey = "THE_ALL_CLUSTER_KEY" + UserAgentName = "Federation-replicaset-Controller" +) + +var ( + replicaSetReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second +) + +func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { + return nil, nil +} + +type ReplicaSetController struct { + fedClient fedclientset.Interface + + replicaSetController *framework.Controller + replicaSetStore cache.StoreToReplicaSetLister + + fedInformer fedutil.FederatedInformer + + replicasetDeliverer *fedutil.DelayingDeliverer + clusterDeliverer *fedutil.DelayingDeliverer + replicasetWorkQueue workqueue.Interface + + replicaSetBackoff *flowcontrol.Backoff + + planner *planner.Planner +} + +// NewclusterController returns a new cluster controller +func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSetController { + frsc := &ReplicaSetController{ + fedClient: federationClient, + replicasetDeliverer: fedutil.NewDelayingDeliverer(), + clusterDeliverer: fedutil.NewDelayingDeliverer(), + replicasetWorkQueue: workqueue.New(), + replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + planner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ + Clusters: map[string]fed.ClusterReplicaSetPreferences{ + "*": {Weight: 1}, + }, + }), + } + + replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset fedclientset.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options) + }, + }, + &extensionsv1.ReplicaSet{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) }, + ), + ) + } + clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *fedv1.Cluster) { /* no rebalancing for now */ }, + ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + } + frsc.fedInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + + frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options) + }, + }, + &extensionsv1.ReplicaSet{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndSpecChanges( + func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }, + ), + ) + + return frsc +} + +func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { + go frsc.replicaSetController.Run(stopCh) + frsc.fedInformer.Start() + + for !frsc.isSynced() { + time.Sleep(5 * time.Millisecond) + } + + frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { + frsc.replicasetWorkQueue.Add(item.Key) + }) + frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { + frsc.reconcileNamespacesOnClusterChange() + }) + + for i := 0; i < workers; i++ { + go wait.Until(frsc.worker, time.Second, stopCh) + } + go func() { + select { + case <-time.After(time.Minute): + frsc.replicaSetBackoff.GC() + case <-stopCh: + return + } + }() + + <-stopCh + glog.Infof("Shutting down ReplicaSetController") + frsc.replicasetDeliverer.Stop() + frsc.clusterDeliverer.Stop() + frsc.replicasetWorkQueue.ShutDown() +} + +func (frsc *ReplicaSetController) isSynced() bool { + if !frsc.fedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err := frsc.fedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !frsc.fedInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + if !frsc.replicaSetController.HasSynced() { + glog.V(2).Infof("federation replicaset list not synced") + return false + } + return true +} + +func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, duration time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %v: %v", obj, err) + return + } + _, exists, err := frsc.replicaSetStore.GetByKey(key) + if err != nil { + glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) + return + } + if exists { // ignore replicasets exists only in local k8s + frsc.deliverReplicaSetByKey(key, duration, false) + } +} + +func (frsc *ReplicaSetController) deliverFedReplicaSetObj(obj interface{}, delay time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + frsc.deliverReplicaSetByKey(key, delay, false) +} + +func (frsc *ReplicaSetController) deliverReplicaSetByKey(key string, delay time.Duration, failed bool) { + if failed { + frsc.replicaSetBackoff.Next(key, time.Now()) + delay = delay + frsc.replicaSetBackoff.Get(key) + } else { + frsc.replicaSetBackoff.Reset(key) + } + frsc.replicasetDeliverer.DeliverAfter(key, nil, delay) +} + +func (frsc *ReplicaSetController) worker() { + for { + item, quit := frsc.replicasetWorkQueue.Get() + if quit { + return + } + key := item.(string) + err := frsc.reconcileReplicaSet(key) + frsc.replicasetWorkQueue.Done(item) + if err != nil { + glog.Errorf("Error syncing cluster controller: %v", err) + frsc.deliverReplicaSetByKey(key, 0, true) + } + } +} + +func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster, + expected map[string]int64, actual map[string]int64) map[string]int64 { + // TODO: integrate real scheduler + + plnr := frsc.planner + frsPref, _ := parseFederationReplicaSetReference(frs) + if frsPref != nil { // create a new planner if user specified a preference + plnr = planner.NewPlanner(frsPref) + } + + replicas := int64(*frs.Spec.Replicas) + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + scheduleResult := plnr.Plan(replicas, clusterNames) + // make sure the return contains clusters need to zero the replicas + result := make(map[string]int64) + for clusterName := range expected { + result[clusterName] = 0 + } + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + return result +} + +func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { + if !frsc.isSynced() { + frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false) + return nil + } + + glog.Infof("Start reconcile replicaset %q", key) + startTime := time.Now() + defer glog.Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) + + obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key) + if err != nil { + return err + } + if !exists { + // don't delete local replicasets for now + return nil + } + frs := obj.(*extensionsv1.ReplicaSet) + + clusters, err := frsc.fedInformer.GetReadyClusters() + if err != nil { + return err + } + + // collect current status and do schedule + expected := make(map[string]int64) + actual := make(map[string]int64) + for _, cluster := range clusters { + lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + return err + } + if exists { + lrs := lrsObj.(*extensionsv1.ReplicaSet) + expected[cluster.Name] = int64(*lrs.Spec.Replicas) + // will get this via pod status + actual[cluster.Name] = int64(lrs.Status.Replicas) + } + } + scheduleResult := frsc.schedule(frs, clusters, expected, actual) + + glog.Infof("Start syncing local replicaset %v", scheduleResult) + + fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} + for clusterName, replicas := range scheduleResult { + // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status + clusterClient, err := frsc.fedInformer.GetClientsetForCluster(clusterName) + if err != nil { + return err + } + lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(clusterName, key) + if err != nil { + return err + } else if !exists { + if replicas > 0 { + lrs := &extensionsv1.ReplicaSet{ + ObjectMeta: apiv1.ObjectMeta{ + Name: frs.Name, + Namespace: frs.Namespace, + Labels: frs.Labels, + Annotations: frs.Annotations, + }, + Spec: frs.Spec, + } + specReplicas := int32(replicas) + lrs.Spec.Replicas = &specReplicas + lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs) + if err != nil { + return err + } + fedStatus.Replicas += lrs.Status.Replicas + fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + } + } else { + lrs := lrsObj.(*extensionsv1.ReplicaSet) + lrsExpectedSpec := frs.Spec + specReplicas := int32(replicas) + lrsExpectedSpec.Replicas = &specReplicas + if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) { + lrs.Spec = lrsExpectedSpec + lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs) + if err != nil { + return err + } + } + fedStatus.Replicas += lrs.Status.Replicas + fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + // leave the replicaset even the replicas dropped to 0 + } + } + if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas { + frs.Status = fedStatus + _, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs) + if err != nil { + return err + } + } + + return nil +} + +func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() { + if !frsc.isSynced() { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + } + rss, _ := frsc.replicaSetStore.List() + for _, rs := range rss { + key, _ := controller.KeyFunc(rs) + frsc.deliverReplicaSetByKey(key, 0, false) + } +} diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go new file mode 100644 index 00000000000..b5e87714429 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "fmt" + "github.com/stretchr/testify/assert" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "testing" + "time" + //kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" + "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/watch" +) + +func TestReplicaSetController(t *testing.T) { + + replicaSetReviewDelay = 10 * time.Millisecond + clusterAvailableDelay = 20 * time.Millisecond + clusterUnavailableDelay = 60 * time.Millisecond + + fedclientset := fedclientfake.NewSimpleClientset() + fedrswatch := watch.NewFake() + fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil)) + + fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) + + kube1clientset := fedclientfake.NewSimpleClientset() + kube1rswatch := watch.NewFake() + kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil)) + kube2clientset := fedclientfake.NewSimpleClientset() + kube2rswatch := watch.NewFake() + kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil)) + + stopChan := make(chan struct{}) + replicaSetController := NewReplicaSetController(fedclientset) + informer := toFederatedInformerForTestOnly(replicaSetController.fedInformer) + informer.SetClientFactory(func(cluster *fedv1.Cluster) (federation_release_1_4.Interface, error) { + switch cluster.Name { + case "k8s-1": + return kube1clientset, nil + case "k8s-2": + return kube2clientset, nil + default: + return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) + } + }) + + go replicaSetController.Run(1, stopChan) + + rs := mkReplicaSet("rs", 9) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs) + fedrswatch.Add(rs) + time.Sleep(1 * time.Second) + + rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1.Status.Replicas = *rs1.Spec.Replicas + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) + kube1rswatch.Modify(rs1) + + rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2.Status.Replicas = *rs2.Spec.Replicas + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) + kube2rswatch.Modify(rs2) + + time.Sleep(1 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) + assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) + + var replicas int32 = 20 + rs.Spec.Replicas = &replicas + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs) + fedrswatch.Modify(rs) + + time.Sleep(2 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) + assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) + + close(stopChan) +} + +func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(fedutil.FederatedInformerForTestOnly) +} + +func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { + return &fedv1.Cluster{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + }, + Status: fedv1.ClusterStatus{ + Conditions: []fedv1.ClusterCondition{ + {Type: fedv1.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func mkReplicaSet(name string, replicas int32) *extensionsv1.ReplicaSet { + return &extensionsv1.ReplicaSet{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + Namespace: apiv1.NamespaceDefault, + }, + Spec: extensionsv1.ReplicaSetSpec{ + Replicas: &replicas, + }, + } +}