From a491580597a729da91809cfe6dcda38e8b32d2c7 Mon Sep 17 00:00:00 2001 From: jianhuiz Date: Tue, 16 Aug 2016 01:29:38 -0700 Subject: [PATCH 1/3] add federation replicaset controller --- .../app/controllermanager.go | 5 + .../app/options/options.go | 20 +- .../replicaset/replicasetcontroller.go | 380 ++++++++++++++++++ .../replicaset/replicasetcontroller_test.go | 134 ++++++ 4 files changed, 532 insertions(+), 7 deletions(-) create mode 100644 federation/pkg/federation-controller/replicaset/replicasetcontroller.go create mode 100644 federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 74a58d0a67e..6612330f8a2 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" + replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" "k8s.io/kubernetes/federation/pkg/federation-controller/util" @@ -155,5 +156,9 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset) secretcontroller.Run(wait.NeverStop) + replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName)) + replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) + go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) + select {} } diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go index 8e3a64ef778..091497110b1 100644 --- a/federation/cmd/federation-controller-manager/app/options/options.go +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -47,6 +47,10 @@ type ControllerManagerConfiguration struct { // allowed to sync concurrently. Larger number = more responsive service // management, but more CPU (and network) load. ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"` + // concurrentReplicaSetSyncs is the number of ReplicaSets that are + // allowed to sync concurrently. Larger number = more responsive service + // management, but more CPU (and network) load. + ConcurrentReplicaSetSyncs int `json:"concurrentReplicaSetSyncs"` // clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` // APIServerQPS is the QPS to use while talking with federation apiserver. @@ -78,13 +82,14 @@ const ( func NewCMServer() *CMServer { s := CMServer{ ControllerManagerConfiguration: ControllerManagerConfiguration{ - Port: FederatedControllerManagerPort, - Address: "0.0.0.0", - ConcurrentServiceSyncs: 10, - ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, - APIServerQPS: 20.0, - APIServerBurst: 30, - LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), + Port: FederatedControllerManagerPort, + Address: "0.0.0.0", + ConcurrentServiceSyncs: 10, + ConcurrentReplicaSetSyncs: 10, + ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, + APIServerQPS: 20.0, + APIServerBurst: 30, + LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), }, } return &s @@ -97,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.") fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.") fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentReplicaSetSyncs, "concurrent-replicaset-syncs", s.ConcurrentReplicaSetSyncs, "The number of ReplicaSets syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go new file mode 100644 index 00000000000..39ce4753cc2 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -0,0 +1,380 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "reflect" + "time" + + "github.com/golang/glog" + + fed "k8s.io/kubernetes/federation/apis/federation" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + //kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + // schedule result was put into annotation in a format of "clusterName:replicas[/clusterName:replicas]..." + ExpectedReplicasAnnotation = "kubernetes.io/expected-replicas" + allClustersKey = "THE_ALL_CLUSTER_KEY" + UserAgentName = "Federation-replicaset-Controller" +) + +var ( + replicaSetReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second +) + +func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { + return nil, nil +} + +type ReplicaSetController struct { + fedClient fedclientset.Interface + + replicaSetController *framework.Controller + replicaSetStore cache.StoreToReplicaSetLister + + fedInformer fedutil.FederatedInformer + + replicasetDeliverer *fedutil.DelayingDeliverer + clusterDeliverer *fedutil.DelayingDeliverer + replicasetWorkQueue workqueue.Interface + + replicaSetBackoff *flowcontrol.Backoff + + planner *planner.Planner +} + +// NewclusterController returns a new cluster controller +func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSetController { + frsc := &ReplicaSetController{ + fedClient: federationClient, + replicasetDeliverer: fedutil.NewDelayingDeliverer(), + clusterDeliverer: fedutil.NewDelayingDeliverer(), + replicasetWorkQueue: workqueue.New(), + replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + planner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ + Clusters: map[string]fed.ClusterReplicaSetPreferences{ + "*": {Weight: 1}, + }, + }), + } + + replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset fedclientset.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options) + }, + }, + &extensionsv1.ReplicaSet{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) }, + ), + ) + } + clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *fedv1.Cluster) { /* no rebalancing for now */ }, + ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + } + frsc.fedInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + + frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options) + }, + }, + &extensionsv1.ReplicaSet{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndSpecChanges( + func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }, + ), + ) + + return frsc +} + +func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { + go frsc.replicaSetController.Run(stopCh) + frsc.fedInformer.Start() + + for !frsc.isSynced() { + time.Sleep(5 * time.Millisecond) + } + + frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { + frsc.replicasetWorkQueue.Add(item.Key) + }) + frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { + frsc.reconcileNamespacesOnClusterChange() + }) + + for i := 0; i < workers; i++ { + go wait.Until(frsc.worker, time.Second, stopCh) + } + go func() { + select { + case <-time.After(time.Minute): + frsc.replicaSetBackoff.GC() + case <-stopCh: + return + } + }() + + <-stopCh + glog.Infof("Shutting down ReplicaSetController") + frsc.replicasetDeliverer.Stop() + frsc.clusterDeliverer.Stop() + frsc.replicasetWorkQueue.ShutDown() +} + +func (frsc *ReplicaSetController) isSynced() bool { + if !frsc.fedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err := frsc.fedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !frsc.fedInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + if !frsc.replicaSetController.HasSynced() { + glog.V(2).Infof("federation replicaset list not synced") + return false + } + return true +} + +func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, duration time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %v: %v", obj, err) + return + } + _, exists, err := frsc.replicaSetStore.GetByKey(key) + if err != nil { + glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) + return + } + if exists { // ignore replicasets exists only in local k8s + frsc.deliverReplicaSetByKey(key, duration, false) + } +} + +func (frsc *ReplicaSetController) deliverFedReplicaSetObj(obj interface{}, delay time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + frsc.deliverReplicaSetByKey(key, delay, false) +} + +func (frsc *ReplicaSetController) deliverReplicaSetByKey(key string, delay time.Duration, failed bool) { + if failed { + frsc.replicaSetBackoff.Next(key, time.Now()) + delay = delay + frsc.replicaSetBackoff.Get(key) + } else { + frsc.replicaSetBackoff.Reset(key) + } + frsc.replicasetDeliverer.DeliverAfter(key, nil, delay) +} + +func (frsc *ReplicaSetController) worker() { + for { + item, quit := frsc.replicasetWorkQueue.Get() + if quit { + return + } + key := item.(string) + err := frsc.reconcileReplicaSet(key) + frsc.replicasetWorkQueue.Done(item) + if err != nil { + glog.Errorf("Error syncing cluster controller: %v", err) + frsc.deliverReplicaSetByKey(key, 0, true) + } + } +} + +func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster, + expected map[string]int64, actual map[string]int64) map[string]int64 { + // TODO: integrate real scheduler + + plnr := frsc.planner + frsPref, _ := parseFederationReplicaSetReference(frs) + if frsPref != nil { // create a new planner if user specified a preference + plnr = planner.NewPlanner(frsPref) + } + + replicas := int64(*frs.Spec.Replicas) + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + scheduleResult := plnr.Plan(replicas, clusterNames) + // make sure the return contains clusters need to zero the replicas + result := make(map[string]int64) + for clusterName := range expected { + result[clusterName] = 0 + } + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + return result +} + +func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { + if !frsc.isSynced() { + frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false) + return nil + } + + glog.Infof("Start reconcile replicaset %q", key) + startTime := time.Now() + defer glog.Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) + + obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key) + if err != nil { + return err + } + if !exists { + // don't delete local replicasets for now + return nil + } + frs := obj.(*extensionsv1.ReplicaSet) + + clusters, err := frsc.fedInformer.GetReadyClusters() + if err != nil { + return err + } + + // collect current status and do schedule + expected := make(map[string]int64) + actual := make(map[string]int64) + for _, cluster := range clusters { + lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + return err + } + if exists { + lrs := lrsObj.(*extensionsv1.ReplicaSet) + expected[cluster.Name] = int64(*lrs.Spec.Replicas) + // will get this via pod status + actual[cluster.Name] = int64(lrs.Status.Replicas) + } + } + scheduleResult := frsc.schedule(frs, clusters, expected, actual) + + glog.Infof("Start syncing local replicaset %v", scheduleResult) + + fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} + for clusterName, replicas := range scheduleResult { + // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status + clusterClient, err := frsc.fedInformer.GetClientsetForCluster(clusterName) + if err != nil { + return err + } + lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(clusterName, key) + if err != nil { + return err + } else if !exists { + if replicas > 0 { + lrs := &extensionsv1.ReplicaSet{ + ObjectMeta: apiv1.ObjectMeta{ + Name: frs.Name, + Namespace: frs.Namespace, + Labels: frs.Labels, + Annotations: frs.Annotations, + }, + Spec: frs.Spec, + } + specReplicas := int32(replicas) + lrs.Spec.Replicas = &specReplicas + lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs) + if err != nil { + return err + } + fedStatus.Replicas += lrs.Status.Replicas + fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + } + } else { + lrs := lrsObj.(*extensionsv1.ReplicaSet) + lrsExpectedSpec := frs.Spec + specReplicas := int32(replicas) + lrsExpectedSpec.Replicas = &specReplicas + if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) { + lrs.Spec = lrsExpectedSpec + lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs) + if err != nil { + return err + } + } + fedStatus.Replicas += lrs.Status.Replicas + fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + // leave the replicaset even the replicas dropped to 0 + } + } + if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas { + frs.Status = fedStatus + _, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs) + if err != nil { + return err + } + } + + return nil +} + +func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() { + if !frsc.isSynced() { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + } + rss, _ := frsc.replicaSetStore.List() + for _, rs := range rss { + key, _ := controller.KeyFunc(rs) + frsc.deliverReplicaSetByKey(key, 0, false) + } +} diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go new file mode 100644 index 00000000000..b5e87714429 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "fmt" + "github.com/stretchr/testify/assert" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "testing" + "time" + //kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" + "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/watch" +) + +func TestReplicaSetController(t *testing.T) { + + replicaSetReviewDelay = 10 * time.Millisecond + clusterAvailableDelay = 20 * time.Millisecond + clusterUnavailableDelay = 60 * time.Millisecond + + fedclientset := fedclientfake.NewSimpleClientset() + fedrswatch := watch.NewFake() + fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil)) + + fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) + + kube1clientset := fedclientfake.NewSimpleClientset() + kube1rswatch := watch.NewFake() + kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil)) + kube2clientset := fedclientfake.NewSimpleClientset() + kube2rswatch := watch.NewFake() + kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil)) + + stopChan := make(chan struct{}) + replicaSetController := NewReplicaSetController(fedclientset) + informer := toFederatedInformerForTestOnly(replicaSetController.fedInformer) + informer.SetClientFactory(func(cluster *fedv1.Cluster) (federation_release_1_4.Interface, error) { + switch cluster.Name { + case "k8s-1": + return kube1clientset, nil + case "k8s-2": + return kube2clientset, nil + default: + return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) + } + }) + + go replicaSetController.Run(1, stopChan) + + rs := mkReplicaSet("rs", 9) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs) + fedrswatch.Add(rs) + time.Sleep(1 * time.Second) + + rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1.Status.Replicas = *rs1.Spec.Replicas + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) + kube1rswatch.Modify(rs1) + + rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2.Status.Replicas = *rs2.Spec.Replicas + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) + kube2rswatch.Modify(rs2) + + time.Sleep(1 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) + assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) + + var replicas int32 = 20 + rs.Spec.Replicas = &replicas + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs) + fedrswatch.Modify(rs) + + time.Sleep(2 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) + assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) + + close(stopChan) +} + +func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(fedutil.FederatedInformerForTestOnly) +} + +func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { + return &fedv1.Cluster{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + }, + Status: fedv1.ClusterStatus{ + Conditions: []fedv1.ClusterCondition{ + {Type: fedv1.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func mkReplicaSet(name string, replicas int32) *extensionsv1.ReplicaSet { + return &extensionsv1.ReplicaSet{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + Namespace: apiv1.NamespaceDefault, + }, + Spec: extensionsv1.ReplicaSetSpec{ + Replicas: &replicas, + }, + } +} From 7598d43db03c7192523b62f9f0e17defc990e4ea Mon Sep 17 00:00:00 2001 From: jianhuiz Date: Fri, 19 Aug 2016 22:22:23 -0700 Subject: [PATCH 2/3] use palnner v2 and pod analyzer --- .../replicaset/replicasetcontroller.go | 128 ++++++++++++++---- .../replicaset/replicasetcontroller_test.go | 69 ++++++++-- 2 files changed, 155 insertions(+), 42 deletions(-) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 39ce4753cc2..2f32bbd7316 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -17,6 +17,7 @@ limitations under the License. package replicaset import ( + "encoding/json" "reflect" "time" @@ -25,13 +26,14 @@ import ( fed "k8s.io/kubernetes/federation/apis/federation" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" - //kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/cache" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" @@ -42,10 +44,9 @@ import ( ) const ( - // schedule result was put into annotation in a format of "clusterName:replicas[/clusterName:replicas]..." - ExpectedReplicasAnnotation = "kubernetes.io/expected-replicas" - allClustersKey = "THE_ALL_CLUSTER_KEY" - UserAgentName = "Federation-replicaset-Controller" + FedReplicaSetPreferencesAnnotation = "" + allClustersKey = "THE_ALL_CLUSTER_KEY" + UserAgentName = "Federation-replicaset-Controller" ) var ( @@ -55,7 +56,23 @@ var ( ) func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { - return nil, nil + accessor, err := meta.Accessor(frs) + if err != nil { + return nil, err + } + anno := accessor.GetAnnotations() + if anno == nil { + return nil, nil + } + frsPrefString, found := anno[FedReplicaSetPreferencesAnnotation] + if !found { + return nil, nil + } + var frsPref fed.FederatedReplicaSetPreferences + if err := json.Unmarshal([]byte(frsPrefString), &frsPref); err != nil { + return nil, err + } + return &frsPref, nil } type ReplicaSetController struct { @@ -64,7 +81,8 @@ type ReplicaSetController struct { replicaSetController *framework.Controller replicaSetStore cache.StoreToReplicaSetLister - fedInformer fedutil.FederatedInformer + fedReplicaSetInformer fedutil.FederatedInformer + fedPodInformer fedutil.FederatedInformer replicasetDeliverer *fedutil.DelayingDeliverer clusterDeliverer *fedutil.DelayingDeliverer @@ -72,7 +90,7 @@ type ReplicaSetController struct { replicaSetBackoff *flowcontrol.Backoff - planner *planner.Planner + defaultPlanner *planner.Planner } // NewclusterController returns a new cluster controller @@ -83,14 +101,14 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe clusterDeliverer: fedutil.NewDelayingDeliverer(), replicasetWorkQueue: workqueue.New(), replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - planner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ + defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ Clusters: map[string]fed.ClusterReplicaSetPreferences{ "*": {Weight: 1}, }, }), } - replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset fedclientset.Interface) (cache.Store, framework.ControllerInterface) { + replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) { return framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -113,7 +131,28 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) }, } - frsc.fedInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + + podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Core().Pods(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Core().Pods(apiv1.NamespaceAll).Watch(options) + }, + }, + &apiv1.Pod{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { + //frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) + }, + ), + ) + } + frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer( &cache.ListWatch{ @@ -136,11 +175,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { go frsc.replicaSetController.Run(stopCh) - frsc.fedInformer.Start() - - for !frsc.isSynced() { - time.Sleep(5 * time.Millisecond) - } + frsc.fedReplicaSetInformer.Start() + frsc.fedPodInformer.Start() frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { frsc.replicasetWorkQueue.Add(item.Key) @@ -149,9 +185,14 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { frsc.reconcileNamespacesOnClusterChange() }) + for !frsc.isSynced() { + time.Sleep(5 * time.Millisecond) + } + for i := 0; i < workers; i++ { go wait.Until(frsc.worker, time.Second, stopCh) } + go func() { select { case <-time.After(time.Minute): @@ -166,21 +207,37 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { frsc.replicasetDeliverer.Stop() frsc.clusterDeliverer.Stop() frsc.replicasetWorkQueue.ShutDown() + frsc.fedReplicaSetInformer.Stop() + frsc.fedPodInformer.Stop() } func (frsc *ReplicaSetController) isSynced() bool { - if !frsc.fedInformer.ClustersSynced() { + if !frsc.fedReplicaSetInformer.ClustersSynced() { glog.V(2).Infof("Cluster list not synced") return false } - clusters, err := frsc.fedInformer.GetReadyClusters() + clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get ready clusters: %v", err) return false } - if !frsc.fedInformer.GetTargetStore().ClustersSynced(clusters) { + if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) { return false } + + if !frsc.fedPodInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err = frsc.fedPodInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + if !frsc.replicaSetController.HasSynced() { glog.V(2).Infof("federation replicaset list not synced") return false @@ -194,7 +251,7 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati glog.Errorf("Couldn't get key for object %v: %v", obj, err) return } - _, exists, err := frsc.replicaSetStore.GetByKey(key) + _, exists, err := frsc.replicaSetStore.Store.GetByKey(key) if err != nil { glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) return @@ -243,8 +300,11 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster expected map[string]int64, actual map[string]int64) map[string]int64 { // TODO: integrate real scheduler - plnr := frsc.planner - frsPref, _ := parseFederationReplicaSetReference(frs) + plnr := frsc.defaultPlanner + frsPref, err := parseFederationReplicaSetReference(frs) + if err != nil { + glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err) + } if frsPref != nil { // create a new planner if user specified a preference plnr = planner.NewPlanner(frsPref) } @@ -254,7 +314,7 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster for _, cluster := range clusters { clusterNames = append(clusterNames, cluster.Name) } - scheduleResult := plnr.Plan(replicas, clusterNames) + scheduleResult, _ := plnr.Plan(replicas, clusterNames, expected, actual) // make sure the return contains clusters need to zero the replicas result := make(map[string]int64) for clusterName := range expected { @@ -286,26 +346,34 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { } frs := obj.(*extensionsv1.ReplicaSet) - clusters, err := frsc.fedInformer.GetReadyClusters() + clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() if err != nil { return err } // collect current status and do schedule + allPods, err := frsc.fedPodInformer.GetTargetStore().List() + if err != nil { + return err + } + podStatus, err := AnalysePods(frs, allPods, time.Now()) expected := make(map[string]int64) actual := make(map[string]int64) for _, cluster := range clusters { - lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(cluster.Name, key) + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { return err } if exists { lrs := lrsObj.(*extensionsv1.ReplicaSet) expected[cluster.Name] = int64(*lrs.Spec.Replicas) - // will get this via pod status - actual[cluster.Name] = int64(lrs.Status.Replicas) + unscheduleable := int64(podStatus[cluster.Name].Unschedulable) + if unscheduleable > 0 { + actual[cluster.Name] = int64(*lrs.Spec.Replicas) + } } } + scheduleResult := frsc.schedule(frs, clusters, expected, actual) glog.Infof("Start syncing local replicaset %v", scheduleResult) @@ -313,11 +381,11 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} for clusterName, replicas := range scheduleResult { // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status - clusterClient, err := frsc.fedInformer.GetClientsetForCluster(clusterName) + clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName) if err != nil { return err } - lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(clusterName, key) + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key) if err != nil { return err } else if !exists { @@ -372,7 +440,7 @@ func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() { if !frsc.isSynced() { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) } - rss, _ := frsc.replicaSetStore.List() + rss := frsc.replicaSetStore.Store.List() for _, rs := range rss { key, _ := controller.KeyFunc(rs) frsc.deliverReplicaSetByKey(key, 0, false) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go index b5e87714429..5ab0aa7ea81 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -17,22 +17,60 @@ limitations under the License. package replicaset import ( + "flag" "fmt" "github.com/stretchr/testify/assert" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api/meta" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" - "testing" - "time" - //kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" - "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" - fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/watch" + "testing" + "time" ) +func TestParseFederationReplicaSetReference(t *testing.T) { + successPrefs := []string{ + `{"rebalance": true, + "clusters": { + "k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2}, + "*": {"weight": 1} + }}`, + } + failedPrefes := []string{ + `{`, // bad json + } + + rs := mkReplicaSet("rs-1", 100) + accessor, _ := meta.Accessor(rs) + anno := accessor.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + accessor.SetAnnotations(anno) + } + for _, prefString := range successPrefs { + anno[FedReplicaSetPreferencesAnnotation] = prefString + pref, err := parseFederationReplicaSetReference(rs) + assert.NotNil(t, pref) + assert.Nil(t, err) + } + for _, prefString := range failedPrefes { + anno[FedReplicaSetPreferencesAnnotation] = prefString + pref, err := parseFederationReplicaSetReference(rs) + assert.Nil(t, pref) + assert.NotNil(t, err) + } +} + func TestReplicaSetController(t *testing.T) { + flag.Set("logtostderr", "true") + flag.Set("v", "5") + flag.Parse() replicaSetReviewDelay = 10 * time.Millisecond clusterAvailableDelay = 20 * time.Millisecond @@ -45,17 +83,18 @@ func TestReplicaSetController(t *testing.T) { fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) - kube1clientset := fedclientfake.NewSimpleClientset() + kube1clientset := kubeclientfake.NewSimpleClientset() kube1rswatch := watch.NewFake() kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil)) - kube2clientset := fedclientfake.NewSimpleClientset() + kube1Podwatch := watch.NewFake() + kube1clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube1Podwatch, nil)) + kube2clientset := kubeclientfake.NewSimpleClientset() kube2rswatch := watch.NewFake() kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil)) + kube2Podwatch := watch.NewFake() + kube2clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube2Podwatch, nil)) - stopChan := make(chan struct{}) - replicaSetController := NewReplicaSetController(fedclientset) - informer := toFederatedInformerForTestOnly(replicaSetController.fedInformer) - informer.SetClientFactory(func(cluster *fedv1.Cluster) (federation_release_1_4.Interface, error) { + fedInformerClientFactory := func(cluster *fedv1.Cluster) (kube_release_1_4.Interface, error) { switch cluster.Name { case "k8s-1": return kube1clientset, nil @@ -64,7 +103,13 @@ func TestReplicaSetController(t *testing.T) { default: return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) } - }) + } + stopChan := make(chan struct{}) + replicaSetController := NewReplicaSetController(fedclientset) + rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer) + rsFedinformer.SetClientFactory(fedInformerClientFactory) + podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer) + podFedinformer.SetClientFactory(fedInformerClientFactory) go replicaSetController.Run(1, stopChan) From 257bda7e1c5461a2a09c557afddb24a4b228879b Mon Sep 17 00:00:00 2001 From: jianhuiz Date: Sat, 20 Aug 2016 13:55:11 -0700 Subject: [PATCH 3/3] review fix --- .../replicaset/replicasetcontroller.go | 47 ++++++++++--------- .../replicaset/replicasetcontroller_test.go | 33 ++++++++----- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 2f32bbd7316..9efdf42f6c7 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -50,9 +50,10 @@ const ( ) var ( - replicaSetReviewDelay = 10 * time.Second - clusterAvailableDelay = 20 * time.Second - clusterUnavailableDelay = 60 * time.Second + replicaSetReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second + allReplicaSetReviewDealy = 2 * time.Minute ) func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { @@ -121,12 +122,14 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe &extensionsv1.ReplicaSet{}, controller.NoResyncPeriodFunc(), fedutil.NewTriggerOnAllChanges( - func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) }, + func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, allReplicaSetReviewDealy) }, ), ) } clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *fedv1.Cluster) { /* no rebalancing for now */ }, + ClusterAvailable: func(cluster *fedv1.Cluster) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) }, @@ -147,7 +150,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe controller.NoResyncPeriodFunc(), fedutil.NewTriggerOnAllChanges( func(obj runtime.Object) { - //frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) }, ), ) @@ -182,7 +185,7 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { frsc.replicasetWorkQueue.Add(item.Key) }) frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { - frsc.reconcileNamespacesOnClusterChange() + frsc.reconcileReplicaSetsOnClusterChange() }) for !frsc.isSynced() { @@ -229,11 +232,6 @@ func (frsc *ReplicaSetController) isSynced() bool { glog.V(2).Infof("Cluster list not synced") return false } - clusters, err = frsc.fedPodInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) { return false } @@ -297,7 +295,7 @@ func (frsc *ReplicaSetController) worker() { } func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster, - expected map[string]int64, actual map[string]int64) map[string]int64 { + current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { // TODO: integrate real scheduler plnr := frsc.defaultPlanner @@ -314,15 +312,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster for _, cluster := range clusters { clusterNames = append(clusterNames, cluster.Name) } - scheduleResult, _ := plnr.Plan(replicas, clusterNames, expected, actual) + scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity) // make sure the return contains clusters need to zero the replicas result := make(map[string]int64) - for clusterName := range expected { + for clusterName := range current { result[clusterName] = 0 } for clusterName, replicas := range scheduleResult { result[clusterName] = replicas } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } return result } @@ -357,8 +358,8 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { return err } podStatus, err := AnalysePods(frs, allPods, time.Now()) - expected := make(map[string]int64) - actual := make(map[string]int64) + current := make(map[string]int64) + estimatedCapacity := make(map[string]int64) for _, cluster := range clusters { lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { @@ -366,15 +367,15 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { } if exists { lrs := lrsObj.(*extensionsv1.ReplicaSet) - expected[cluster.Name] = int64(*lrs.Spec.Replicas) - unscheduleable := int64(podStatus[cluster.Name].Unschedulable) - if unscheduleable > 0 { - actual[cluster.Name] = int64(*lrs.Spec.Replicas) + current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well? + unschedulable := int64(podStatus[cluster.Name].Unschedulable) + if unschedulable > 0 { + estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable } } } - scheduleResult := frsc.schedule(frs, clusters, expected, actual) + scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity) glog.Infof("Start syncing local replicaset %v", scheduleResult) @@ -436,7 +437,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { return nil } -func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() { +func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() { if !frsc.isSynced() { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) } diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go index 5ab0aa7ea81..3a194a40d19 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -46,7 +46,7 @@ func TestParseFederationReplicaSetReference(t *testing.T) { `{`, // bad json } - rs := mkReplicaSet("rs-1", 100) + rs := newReplicaSetWithReplicas("rs-1", 100) accessor, _ := meta.Accessor(rs) anno := accessor.GetAnnotations() if anno == nil { @@ -75,13 +75,14 @@ func TestReplicaSetController(t *testing.T) { replicaSetReviewDelay = 10 * time.Millisecond clusterAvailableDelay = 20 * time.Millisecond clusterUnavailableDelay = 60 * time.Millisecond + allReplicaSetReviewDealy = 120 * time.Millisecond fedclientset := fedclientfake.NewSimpleClientset() fedrswatch := watch.NewFake() fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil)) - fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) - fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-1", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-2", apiv1.ConditionTrue)) kube1clientset := kubeclientfake.NewSimpleClientset() kube1rswatch := watch.NewFake() @@ -104,26 +105,29 @@ func TestReplicaSetController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) } } - stopChan := make(chan struct{}) replicaSetController := NewReplicaSetController(fedclientset) rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer) rsFedinformer.SetClientFactory(fedInformerClientFactory) podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer) podFedinformer.SetClientFactory(fedInformerClientFactory) + stopChan := make(chan struct{}) + defer close(stopChan) go replicaSetController.Run(1, stopChan) - rs := mkReplicaSet("rs", 9) + rs := newReplicaSetWithReplicas("rs", 9) rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs) fedrswatch.Add(rs) time.Sleep(1 * time.Second) rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + kube1rswatch.Add(rs1) rs1.Status.Replicas = *rs1.Spec.Replicas rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) kube1rswatch.Modify(rs1) rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + kube2rswatch.Add(rs2) rs2.Status.Replicas = *rs2.Spec.Replicas rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) kube2rswatch.Modify(rs2) @@ -137,15 +141,22 @@ func TestReplicaSetController(t *testing.T) { rs.Spec.Replicas = &replicas rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs) fedrswatch.Modify(rs) + time.Sleep(1 * time.Second) - time.Sleep(2 * time.Second) - rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1.Status.Replicas = *rs1.Spec.Replicas + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) + kube1rswatch.Modify(rs1) + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2.Status.Replicas = *rs2.Spec.Replicas + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) + kube2rswatch.Modify(rs2) + + time.Sleep(1 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) - - close(stopChan) } func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly { @@ -153,7 +164,7 @@ func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil. return inter.(fedutil.FederatedInformerForTestOnly) } -func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { +func newClusterWithReadyStatus(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { return &fedv1.Cluster{ ObjectMeta: apiv1.ObjectMeta{ Name: name, @@ -166,7 +177,7 @@ func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { } } -func mkReplicaSet(name string, replicas int32) *extensionsv1.ReplicaSet { +func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet { return &extensionsv1.ReplicaSet{ ObjectMeta: apiv1.ObjectMeta{ Name: name,