Merge pull request #31744 from mwielgus/rs-fix-2

Automatic merge from submit-queue

Clean-up and fixes in federated replica set

* Create and update consistent with other controllers. Previously an annotation update on federated rs would not trigger local rs update.
* Use of federatedUpdater. The previous code use talked to clusters manually, assuming that the stats values in the local rs would be automatically and immediately updated. These stats are updated by controllers so they are not immediately updated and the currently existing stats can be used for building federated rs stats.
* Trigger a rs recheck after some operations are executed.

cc: @quinton-hoole @jianhuiz @wojtek-t @kubernetes/sig-cluster-federation
This commit is contained in:
Kubernetes Submit Queue 2016-08-31 11:25:01 -07:00 committed by GitHub
commit ae940c09f3

View File

@ -56,6 +56,7 @@ var (
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allReplicaSetReviewDealy = 2 * time.Minute
updateTimeout = 30 * time.Second
)
func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
@ -85,6 +86,8 @@ type ReplicaSetController struct {
replicasetDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
replicasetWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater
replicaSetBackoff *flowcontrol.Backoff
@ -170,6 +173,23 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
),
)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
err := client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, &api.DeleteOptions{})
return err
})
return frsc
}
@ -293,11 +313,25 @@ func (frsc *ReplicaSetController) worker() {
return
}
key := item.(string)
err := frsc.reconcileReplicaSet(key)
status, err := frsc.reconcileReplicaSet(key)
frsc.replicasetWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing cluster controller: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
frsc.deliverReplicaSetByKey(key, 0, true)
case statusNeedRecheck:
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
case statusNotSynced:
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
}
}
}
}
@ -352,10 +386,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
return result
}
func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliationStatus, error) {
if !frsc.isSynced() {
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
return nil
return statusNotSynced, nil
}
glog.V(4).Infof("Start reconcile replicaset %q", key)
@ -364,23 +406,23 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
if err != nil {
return err
return statusError, err
}
if !exists {
// don't delete local replicasets for now
return nil
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil
}
frs := obj.(*extensionsv1.ReplicaSet)
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
return err
return statusError, err
}
// collect current status and do schedule
allPods, err := frsc.fedPodInformer.GetTargetStore().List()
if err != nil {
return err
return statusError, err
}
podStatus, err := AnalysePods(frs, allPods, time.Now())
current := make(map[string]int64)
@ -388,7 +430,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
for _, cluster := range clusters {
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
return err
return statusError, err
}
if exists {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
@ -405,49 +447,42 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
glog.V(4).Infof("Start syncing local replicaset %s: %v", key, scheduleResult)
fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation}
operations := make([]fedutil.FederatedOperation, 0)
for clusterName, replicas := range scheduleResult {
// TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status
clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName)
if err != nil {
return err
}
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return err
} else if !exists {
return statusError, err
}
lrs := &extensionsv1.ReplicaSet{
ObjectMeta: fedutil.CopyObjectMeta(frs.ObjectMeta),
Spec: frs.Spec,
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas
if !exists {
if replicas > 0 {
lrs := &extensionsv1.ReplicaSet{
ObjectMeta: apiv1.ObjectMeta{
Name: frs.Name,
Namespace: frs.Namespace,
Labels: frs.Labels,
Annotations: frs.Annotations,
},
Spec: frs.Spec,
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs)
if err != nil {
return err
}
fedStatus.Replicas += lrs.Status.Replicas
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: lrs,
ClusterName: clusterName,
})
}
} else {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
lrsExpectedSpec := frs.Spec
specReplicas := int32(replicas)
lrsExpectedSpec.Replicas = &specReplicas
if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) {
lrs.Spec = lrsExpectedSpec
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs)
if err != nil {
return err
}
currentLrs := lrsObj.(*extensionsv1.ReplicaSet)
// Update existing replica set, if needed.
if !fedutil.ObjectMetaEquivalent(lrs.ObjectMeta, currentLrs.ObjectMeta) ||
!reflect.DeepEqual(lrs.Spec, currentLrs.Spec) {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: lrs,
ClusterName: clusterName,
})
}
fedStatus.Replicas += lrs.Status.Replicas
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
fedStatus.Replicas += currentLrs.Status.Replicas
fedStatus.FullyLabeledReplicas += currentLrs.Status.FullyLabeledReplicas
// leave the replicaset even the replicas dropped to 0
}
}
@ -455,11 +490,22 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
frs.Status = fedStatus
_, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs)
if err != nil {
return err
return statusError, err
}
}
return nil
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
err = frsc.fedUpdater.Update(operations, updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err
}
// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}
func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {