diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 2f32bbd7316..9efdf42f6c7 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -50,9 +50,10 @@ const ( ) var ( - replicaSetReviewDelay = 10 * time.Second - clusterAvailableDelay = 20 * time.Second - clusterUnavailableDelay = 60 * time.Second + replicaSetReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second + allReplicaSetReviewDealy = 2 * time.Minute ) func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { @@ -121,12 +122,14 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe &extensionsv1.ReplicaSet{}, controller.NoResyncPeriodFunc(), fedutil.NewTriggerOnAllChanges( - func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) }, + func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, allReplicaSetReviewDealy) }, ), ) } clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *fedv1.Cluster) { /* no rebalancing for now */ }, + ClusterAvailable: func(cluster *fedv1.Cluster) { + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) }, @@ -147,7 +150,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe controller.NoResyncPeriodFunc(), fedutil.NewTriggerOnAllChanges( func(obj runtime.Object) { - //frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) + frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) }, ), ) @@ -182,7 +185,7 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { frsc.replicasetWorkQueue.Add(item.Key) }) frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { - frsc.reconcileNamespacesOnClusterChange() + frsc.reconcileReplicaSetsOnClusterChange() }) for !frsc.isSynced() { @@ -229,11 +232,6 @@ func (frsc *ReplicaSetController) isSynced() bool { glog.V(2).Infof("Cluster list not synced") return false } - clusters, err = frsc.fedPodInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) { return false } @@ -297,7 +295,7 @@ func (frsc *ReplicaSetController) worker() { } func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster, - expected map[string]int64, actual map[string]int64) map[string]int64 { + current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { // TODO: integrate real scheduler plnr := frsc.defaultPlanner @@ -314,15 +312,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster for _, cluster := range clusters { clusterNames = append(clusterNames, cluster.Name) } - scheduleResult, _ := plnr.Plan(replicas, clusterNames, expected, actual) + scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity) // make sure the return contains clusters need to zero the replicas result := make(map[string]int64) - for clusterName := range expected { + for clusterName := range current { result[clusterName] = 0 } for clusterName, replicas := range scheduleResult { result[clusterName] = replicas } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } return result } @@ -357,8 +358,8 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { return err } podStatus, err := AnalysePods(frs, allPods, time.Now()) - expected := make(map[string]int64) - actual := make(map[string]int64) + current := make(map[string]int64) + estimatedCapacity := make(map[string]int64) for _, cluster := range clusters { lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { @@ -366,15 +367,15 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { } if exists { lrs := lrsObj.(*extensionsv1.ReplicaSet) - expected[cluster.Name] = int64(*lrs.Spec.Replicas) - unscheduleable := int64(podStatus[cluster.Name].Unschedulable) - if unscheduleable > 0 { - actual[cluster.Name] = int64(*lrs.Spec.Replicas) + current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well? + unschedulable := int64(podStatus[cluster.Name].Unschedulable) + if unschedulable > 0 { + estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable } } } - scheduleResult := frsc.schedule(frs, clusters, expected, actual) + scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity) glog.Infof("Start syncing local replicaset %v", scheduleResult) @@ -436,7 +437,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { return nil } -func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() { +func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() { if !frsc.isSynced() { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) } diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go index 5ab0aa7ea81..3a194a40d19 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -46,7 +46,7 @@ func TestParseFederationReplicaSetReference(t *testing.T) { `{`, // bad json } - rs := mkReplicaSet("rs-1", 100) + rs := newReplicaSetWithReplicas("rs-1", 100) accessor, _ := meta.Accessor(rs) anno := accessor.GetAnnotations() if anno == nil { @@ -75,13 +75,14 @@ func TestReplicaSetController(t *testing.T) { replicaSetReviewDelay = 10 * time.Millisecond clusterAvailableDelay = 20 * time.Millisecond clusterUnavailableDelay = 60 * time.Millisecond + allReplicaSetReviewDealy = 120 * time.Millisecond fedclientset := fedclientfake.NewSimpleClientset() fedrswatch := watch.NewFake() fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil)) - fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) - fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-1", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-2", apiv1.ConditionTrue)) kube1clientset := kubeclientfake.NewSimpleClientset() kube1rswatch := watch.NewFake() @@ -104,26 +105,29 @@ func TestReplicaSetController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) } } - stopChan := make(chan struct{}) replicaSetController := NewReplicaSetController(fedclientset) rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer) rsFedinformer.SetClientFactory(fedInformerClientFactory) podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer) podFedinformer.SetClientFactory(fedInformerClientFactory) + stopChan := make(chan struct{}) + defer close(stopChan) go replicaSetController.Run(1, stopChan) - rs := mkReplicaSet("rs", 9) + rs := newReplicaSetWithReplicas("rs", 9) rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs) fedrswatch.Add(rs) time.Sleep(1 * time.Second) rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + kube1rswatch.Add(rs1) rs1.Status.Replicas = *rs1.Spec.Replicas rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) kube1rswatch.Modify(rs1) rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + kube2rswatch.Add(rs2) rs2.Status.Replicas = *rs2.Spec.Replicas rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) kube2rswatch.Modify(rs2) @@ -137,15 +141,22 @@ func TestReplicaSetController(t *testing.T) { rs.Spec.Replicas = &replicas rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs) fedrswatch.Modify(rs) + time.Sleep(1 * time.Second) - time.Sleep(2 * time.Second) - rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs1.Status.Replicas = *rs1.Spec.Replicas + rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1) + kube1rswatch.Modify(rs1) + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) + rs2.Status.Replicas = *rs2.Spec.Replicas + rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2) + kube2rswatch.Modify(rs2) + + time.Sleep(1 * time.Second) + rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name) assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) - - close(stopChan) } func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly { @@ -153,7 +164,7 @@ func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil. return inter.(fedutil.FederatedInformerForTestOnly) } -func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { +func newClusterWithReadyStatus(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { return &fedv1.Cluster{ ObjectMeta: apiv1.ObjectMeta{ Name: name, @@ -166,7 +177,7 @@ func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster { } } -func mkReplicaSet(name string, replicas int32) *extensionsv1.ReplicaSet { +func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet { return &extensionsv1.ReplicaSet{ ObjectMeta: apiv1.ObjectMeta{ Name: name,