diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 82646297544..cbd6f3a1969 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -56,6 +56,7 @@ var ( clusterAvailableDelay = 20 * time.Second clusterUnavailableDelay = 60 * time.Second allReplicaSetReviewDealy = 2 * time.Minute + updateTimeout = 30 * time.Second ) func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { @@ -85,6 +86,8 @@ type ReplicaSetController struct { replicasetDeliverer *fedutil.DelayingDeliverer clusterDeliverer *fedutil.DelayingDeliverer replicasetWorkQueue workqueue.Interface + // For updating members of federation. + fedUpdater fedutil.FederatedUpdater replicaSetBackoff *flowcontrol.Backoff @@ -170,6 +173,23 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe ), ) + frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*extensionsv1.ReplicaSet) + _, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs) + return err + }, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*extensionsv1.ReplicaSet) + _, err := client.Extensions().ReplicaSets(rs.Namespace).Update(rs) + return err + }, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*extensionsv1.ReplicaSet) + err := client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, &api.DeleteOptions{}) + return err + }) + return frsc } @@ -293,11 +313,25 @@ func (frsc *ReplicaSetController) worker() { return } key := item.(string) - err := frsc.reconcileReplicaSet(key) + status, err := frsc.reconcileReplicaSet(key) frsc.replicasetWorkQueue.Done(item) if err != nil { glog.Errorf("Error syncing cluster controller: %v", err) frsc.deliverReplicaSetByKey(key, 0, true) + } else { + switch status { + case statusAllOk: + break + case statusError: + frsc.deliverReplicaSetByKey(key, 0, true) + case statusNeedRecheck: + frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false) + case statusNotSynced: + frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false) + default: + glog.Errorf("Unhandled reconciliation status: %s", status) + frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false) + } } } } @@ -352,10 +386,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster return result } -func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { +type reconciliationStatus string + +const ( + statusAllOk = reconciliationStatus("ALL_OK") + statusNeedRecheck = reconciliationStatus("RECHECK") + statusError = reconciliationStatus("ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") +) + +func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliationStatus, error) { if !frsc.isSynced() { - frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false) - return nil + return statusNotSynced, nil } glog.V(4).Infof("Start reconcile replicaset %q", key) @@ -364,23 +406,23 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key) if err != nil { - return err + return statusError, err } if !exists { - // don't delete local replicasets for now - return nil + // don't delete local replicasets for now. Do not reconcile it anymore. + return statusAllOk, nil } frs := obj.(*extensionsv1.ReplicaSet) clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() if err != nil { - return err + return statusError, err } // collect current status and do schedule allPods, err := frsc.fedPodInformer.GetTargetStore().List() if err != nil { - return err + return statusError, err } podStatus, err := AnalysePods(frs, allPods, time.Now()) current := make(map[string]int64) @@ -388,7 +430,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { for _, cluster := range clusters { lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { - return err + return statusError, err } if exists { lrs := lrsObj.(*extensionsv1.ReplicaSet) @@ -405,49 +447,42 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { glog.V(4).Infof("Start syncing local replicaset %s: %v", key, scheduleResult) fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} + operations := make([]fedutil.FederatedOperation, 0) for clusterName, replicas := range scheduleResult { - // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status - clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName) - if err != nil { - return err - } + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key) if err != nil { - return err - } else if !exists { + return statusError, err + } + + lrs := &extensionsv1.ReplicaSet{ + ObjectMeta: fedutil.CopyObjectMeta(frs.ObjectMeta), + Spec: frs.Spec, + } + specReplicas := int32(replicas) + lrs.Spec.Replicas = &specReplicas + + if !exists { if replicas > 0 { - lrs := &extensionsv1.ReplicaSet{ - ObjectMeta: apiv1.ObjectMeta{ - Name: frs.Name, - Namespace: frs.Namespace, - Labels: frs.Labels, - Annotations: frs.Annotations, - }, - Spec: frs.Spec, - } - specReplicas := int32(replicas) - lrs.Spec.Replicas = &specReplicas - lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs) - if err != nil { - return err - } - fedStatus.Replicas += lrs.Status.Replicas - fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + operations = append(operations, fedutil.FederatedOperation{ + Type: fedutil.OperationTypeAdd, + Obj: lrs, + ClusterName: clusterName, + }) } } else { - lrs := lrsObj.(*extensionsv1.ReplicaSet) - lrsExpectedSpec := frs.Spec - specReplicas := int32(replicas) - lrsExpectedSpec.Replicas = &specReplicas - if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) { - lrs.Spec = lrsExpectedSpec - lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs) - if err != nil { - return err - } + currentLrs := lrsObj.(*extensionsv1.ReplicaSet) + // Update existing replica set, if needed. + if !fedutil.ObjectMetaEquivalent(lrs.ObjectMeta, currentLrs.ObjectMeta) || + !reflect.DeepEqual(lrs.Spec, currentLrs.Spec) { + operations = append(operations, fedutil.FederatedOperation{ + Type: fedutil.OperationTypeUpdate, + Obj: lrs, + ClusterName: clusterName, + }) } - fedStatus.Replicas += lrs.Status.Replicas - fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas + fedStatus.Replicas += currentLrs.Status.Replicas + fedStatus.FullyLabeledReplicas += currentLrs.Status.FullyLabeledReplicas // leave the replicaset even the replicas dropped to 0 } } @@ -455,11 +490,22 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { frs.Status = fedStatus _, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs) if err != nil { - return err + return statusError, err } } - return nil + if len(operations) == 0 { + // Everything is in order + return statusAllOk, nil + } + err = frsc.fedUpdater.Update(operations, updateTimeout) + if err != nil { + glog.Errorf("Failed to execute updates for %s: %v", key, err) + return statusError, err + } + + // Some operations were made, reconcile after a while. + return statusNeedRecheck, nil } func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {