Enqueue controllers after minreadyseconds when all pods are ready

This commit is contained in:
Michail Kargakis 2017-02-25 11:51:20 +01:00
parent 77733c2afd
commit 9eab226947
4 changed files with 47 additions and 20 deletions

View File

@ -632,10 +632,17 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
if err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus); err != nil {
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}

View File

@ -33,7 +33,7 @@ import (
)
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (updateErr error) {
func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (*extensions.ReplicaSet, error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
@ -43,14 +43,14 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rs.Generation == rs.Status.ObservedGeneration &&
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
return nil
return rs, nil
}
// deep copy to avoid mutation now.
// TODO this method need some work. Retry on conflict probably, though I suspect this is stomping status to something it probably shouldn't
copyObj, err := api.Scheme.DeepCopy(rs)
if err != nil {
return err
return nil, err
}
rs = copyObj.(*extensions.ReplicaSet)
@ -60,9 +60,10 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext
// same status.
newStatus.ObservedGeneration = rs.Generation
var getErr error
var getErr, updateErr error
var updatedRS *extensions.ReplicaSet
for i, rs := 0, rs; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating replica count for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
glog.V(4).Infof(fmt.Sprintf("Updating status for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
@ -70,17 +71,23 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))
rs.Status = newStatus
_, updateErr = c.UpdateStatus(rs)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr
updatedRS, updateErr = c.UpdateStatus(rs)
if updateErr == nil {
return updatedRS, nil
}
// Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit.
if i >= statusUpdateRetries {
break
}
// Update the ReplicaSet with the latest resource version for the next poll
if rs, getErr = c.Get(rs.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
return nil, getErr
}
}
return nil, updateErr
}
func calculateStatus(rs *extensions.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) extensions.ReplicaSetStatus {

View File

@ -652,10 +652,16 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus); err != nil {
updatedRC, err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicationController after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRC.Spec.MinReadySeconds > 0 &&
updatedRC.Status.ReadyReplicas == *(updatedRC.Spec.Replicas) &&
updatedRC.Status.AvailableReplicas != *(updatedRC.Spec.Replicas) {
rm.enqueueControllerAfter(updatedRC, time.Duration(updatedRC.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}

View File

@ -30,7 +30,7 @@ import (
)
// updateReplicationControllerStatus attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (updateErr error) {
func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (*v1.ReplicationController, error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
@ -40,7 +40,7 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface,
rc.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rc.Generation == rc.Status.ObservedGeneration &&
reflect.DeepEqual(rc.Status.Conditions, newStatus.Conditions) {
return nil
return &rc, nil
}
// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
@ -48,9 +48,10 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface,
// same status.
newStatus.ObservedGeneration = rc.Generation
var getErr error
var getErr, updateErr error
var updatedRC *v1.ReplicationController
for i, rc := 0, &rc; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", rc.Namespace, rc.Name) +
glog.V(4).Infof(fmt.Sprintf("Updating status for rc: %s/%s, ", rc.Namespace, rc.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rc.Status.Replicas, newStatus.Replicas, *(rc.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rc.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rc.Status.ReadyReplicas, newStatus.ReadyReplicas) +
@ -58,17 +59,23 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface,
fmt.Sprintf("sequence No: %v->%v", rc.Status.ObservedGeneration, newStatus.ObservedGeneration))
rc.Status = newStatus
_, updateErr = c.UpdateStatus(rc)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr
updatedRC, updateErr = c.UpdateStatus(rc)
if updateErr == nil {
return updatedRC, nil
}
// Stop retrying if we exceed statusUpdateRetries - the replicationController will be requeued with a rate limit.
if i >= statusUpdateRetries {
break
}
// Update the controller with the latest resource version for the next poll
if rc, getErr = c.Get(rc.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
return nil, getErr
}
}
return nil, updateErr
}
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.