Clean-up and fixes in federated replica set

This commit is contained in:
Marcin Wielgus 2016-08-30 23:23:04 +02:00
parent 3fd14d97fb
commit 4c55babcf8

View File

@ -56,6 +56,7 @@ var (
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allReplicaSetReviewDealy = 2 * time.Minute
updateTimeout = 30 * time.Second
)
func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
@ -85,6 +86,8 @@ type ReplicaSetController struct {
replicasetDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
replicasetWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater
replicaSetBackoff *flowcontrol.Backoff
@ -170,6 +173,23 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
),
)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
err := client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, &api.DeleteOptions{})
return err
})
return frsc
}
@ -293,11 +313,25 @@ func (frsc *ReplicaSetController) worker() {
return
}
key := item.(string)
err := frsc.reconcileReplicaSet(key)
status, err := frsc.reconcileReplicaSet(key)
frsc.replicasetWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing cluster controller: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
frsc.deliverReplicaSetByKey(key, 0, true)
case statusNeedRecheck:
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
case statusNotSynced:
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
}
}
}
}
@ -352,10 +386,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
return result
}
func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliationStatus, error) {
if !frsc.isSynced() {
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
return nil
return statusNotSynced, nil
}
glog.V(4).Infof("Start reconcile replicaset %q", key)
@ -364,23 +406,23 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
if err != nil {
return err
return statusError, err
}
if !exists {
// don't delete local replicasets for now
return nil
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil
}
frs := obj.(*extensionsv1.ReplicaSet)
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
return err
return statusError, err
}
// collect current status and do schedule
allPods, err := frsc.fedPodInformer.GetTargetStore().List()
if err != nil {
return err
return statusError, err
}
podStatus, err := AnalysePods(frs, allPods, time.Now())
current := make(map[string]int64)
@ -388,7 +430,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
for _, cluster := range clusters {
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
return err
return statusError, err
}
if exists {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
@ -405,49 +447,42 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
glog.V(4).Infof("Start syncing local replicaset %s: %v", key, scheduleResult)
fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation}
operations := make([]fedutil.FederatedOperation, 0)
for clusterName, replicas := range scheduleResult {
// TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status
clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName)
if err != nil {
return err
}
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return err
} else if !exists {
return statusError, err
}
lrs := &extensionsv1.ReplicaSet{
ObjectMeta: fedutil.CopyObjectMeta(frs.ObjectMeta),
Spec: frs.Spec,
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas
if !exists {
if replicas > 0 {
lrs := &extensionsv1.ReplicaSet{
ObjectMeta: apiv1.ObjectMeta{
Name: frs.Name,
Namespace: frs.Namespace,
Labels: frs.Labels,
Annotations: frs.Annotations,
},
Spec: frs.Spec,
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs)
if err != nil {
return err
}
fedStatus.Replicas += lrs.Status.Replicas
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: lrs,
ClusterName: clusterName,
})
}
} else {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
lrsExpectedSpec := frs.Spec
specReplicas := int32(replicas)
lrsExpectedSpec.Replicas = &specReplicas
if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) {
lrs.Spec = lrsExpectedSpec
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs)
if err != nil {
return err
}
currentLrs := lrsObj.(*extensionsv1.ReplicaSet)
// Update existing replica set, if needed.
if !fedutil.ObjectMetaEquivalent(lrs.ObjectMeta, currentLrs.ObjectMeta) ||
!reflect.DeepEqual(lrs.Spec, currentLrs.Spec) {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: lrs,
ClusterName: clusterName,
})
}
fedStatus.Replicas += lrs.Status.Replicas
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
fedStatus.Replicas += currentLrs.Status.Replicas
fedStatus.FullyLabeledReplicas += currentLrs.Status.FullyLabeledReplicas
// leave the replicaset even the replicas dropped to 0
}
}
@ -455,11 +490,22 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
frs.Status = fedStatus
_, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs)
if err != nil {
return err
return statusError, err
}
}
return nil
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
err = frsc.fedUpdater.Update(operations, updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err
}
// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}
func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {