review fix

This commit is contained in:
jianhuiz 2016-08-20 13:55:11 -07:00
parent 7598d43db0
commit 257bda7e1c
2 changed files with 46 additions and 34 deletions

View File

@ -50,9 +50,10 @@ const (
)
var (
replicaSetReviewDelay = 10 * time.Second
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
replicaSetReviewDelay = 10 * time.Second
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allReplicaSetReviewDealy = 2 * time.Minute
)
func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
@ -121,12 +122,14 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
&extensionsv1.ReplicaSet{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) },
func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, allReplicaSetReviewDealy) },
),
)
}
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1.Cluster) { /* no rebalancing for now */ },
ClusterAvailable: func(cluster *fedv1.Cluster) {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
@ -147,7 +150,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) {
//frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay)
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
),
)
@ -182,7 +185,7 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
frsc.replicasetWorkQueue.Add(item.Key)
})
frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
frsc.reconcileNamespacesOnClusterChange()
frsc.reconcileReplicaSetsOnClusterChange()
})
for !frsc.isSynced() {
@ -229,11 +232,6 @@ func (frsc *ReplicaSetController) isSynced() bool {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters, err = frsc.fedPodInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) {
return false
}
@ -297,7 +295,7 @@ func (frsc *ReplicaSetController) worker() {
}
func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster,
expected map[string]int64, actual map[string]int64) map[string]int64 {
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
plnr := frsc.defaultPlanner
@ -314,15 +312,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
scheduleResult, _ := plnr.Plan(replicas, clusterNames, expected, actual)
scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity)
// make sure the return contains clusters need to zero the replicas
result := make(map[string]int64)
for clusterName := range expected {
for clusterName := range current {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
return result
}
@ -357,8 +358,8 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
return err
}
podStatus, err := AnalysePods(frs, allPods, time.Now())
expected := make(map[string]int64)
actual := make(map[string]int64)
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
@ -366,15 +367,15 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
}
if exists {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
expected[cluster.Name] = int64(*lrs.Spec.Replicas)
unscheduleable := int64(podStatus[cluster.Name].Unschedulable)
if unscheduleable > 0 {
actual[cluster.Name] = int64(*lrs.Spec.Replicas)
current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well?
unschedulable := int64(podStatus[cluster.Name].Unschedulable)
if unschedulable > 0 {
estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable
}
}
}
scheduleResult := frsc.schedule(frs, clusters, expected, actual)
scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity)
glog.Infof("Start syncing local replicaset %v", scheduleResult)
@ -436,7 +437,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
return nil
}
func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() {
func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}

View File

@ -46,7 +46,7 @@ func TestParseFederationReplicaSetReference(t *testing.T) {
`{`, // bad json
}
rs := mkReplicaSet("rs-1", 100)
rs := newReplicaSetWithReplicas("rs-1", 100)
accessor, _ := meta.Accessor(rs)
anno := accessor.GetAnnotations()
if anno == nil {
@ -75,13 +75,14 @@ func TestReplicaSetController(t *testing.T) {
replicaSetReviewDelay = 10 * time.Millisecond
clusterAvailableDelay = 20 * time.Millisecond
clusterUnavailableDelay = 60 * time.Millisecond
allReplicaSetReviewDealy = 120 * time.Millisecond
fedclientset := fedclientfake.NewSimpleClientset()
fedrswatch := watch.NewFake()
fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil))
fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue))
fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue))
fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-1", apiv1.ConditionTrue))
fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-2", apiv1.ConditionTrue))
kube1clientset := kubeclientfake.NewSimpleClientset()
kube1rswatch := watch.NewFake()
@ -104,26 +105,29 @@ func TestReplicaSetController(t *testing.T) {
return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name)
}
}
stopChan := make(chan struct{})
replicaSetController := NewReplicaSetController(fedclientset)
rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer)
rsFedinformer.SetClientFactory(fedInformerClientFactory)
podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer)
podFedinformer.SetClientFactory(fedInformerClientFactory)
stopChan := make(chan struct{})
defer close(stopChan)
go replicaSetController.Run(1, stopChan)
rs := mkReplicaSet("rs", 9)
rs := newReplicaSetWithReplicas("rs", 9)
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs)
fedrswatch.Add(rs)
time.Sleep(1 * time.Second)
rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
kube1rswatch.Add(rs1)
rs1.Status.Replicas = *rs1.Spec.Replicas
rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1)
kube1rswatch.Modify(rs1)
rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
kube2rswatch.Add(rs2)
rs2.Status.Replicas = *rs2.Spec.Replicas
rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2)
kube2rswatch.Modify(rs2)
@ -137,15 +141,22 @@ func TestReplicaSetController(t *testing.T) {
rs.Spec.Replicas = &replicas
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs)
fedrswatch.Modify(rs)
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
rs1.Status.Replicas = *rs1.Spec.Replicas
rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1)
kube1rswatch.Modify(rs1)
rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
rs2.Status.Replicas = *rs2.Spec.Replicas
rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2)
kube2rswatch.Modify(rs2)
time.Sleep(1 * time.Second)
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas)
assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas)
close(stopChan)
}
func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly {
@ -153,7 +164,7 @@ func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.
return inter.(fedutil.FederatedInformerForTestOnly)
}
func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster {
func newClusterWithReadyStatus(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster {
return &fedv1.Cluster{
ObjectMeta: apiv1.ObjectMeta{
Name: name,
@ -166,7 +177,7 @@ func mkCluster(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster {
}
}
func mkReplicaSet(name string, replicas int32) *extensionsv1.ReplicaSet {
func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet {
return &extensionsv1.ReplicaSet{
ObjectMeta: apiv1.ObjectMeta{
Name: name,