From 9eab226947d73a77cbf8474188f216cd64cd5fef Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Sat, 25 Feb 2017 11:51:20 +0100 Subject: [PATCH] Enqueue controllers after minreadyseconds when all pods are ready --- pkg/controller/replicaset/replica_set.go | 9 ++++++- .../replicaset/replica_set_utils.go | 25 ++++++++++++------- .../replication/replication_controller.go | 10 ++++++-- .../replication_controller_utils.go | 23 +++++++++++------ 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 02073a2055c..af61a6a4501 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -632,10 +632,17 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. - if err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus); err != nil { + updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus) + if err != nil { // Multiple things could lead to this update failing. Requeuing the replica set ensures // Returning an error causes a requeue without forcing a hotloop return err } + // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew. + if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && + updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && + updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { + rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) + } return manageReplicasErr } diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index c3c394206b5..84bf935b703 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -33,7 +33,7 @@ import ( ) // updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry. -func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (updateErr error) { +func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (*extensions.ReplicaSet, error) { // This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since // we do a periodic relist every 30s. If the generations differ but the replicas are // the same, a caller might've resized to the same replica count. @@ -43,14 +43,14 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext rs.Status.AvailableReplicas == newStatus.AvailableReplicas && rs.Generation == rs.Status.ObservedGeneration && reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) { - return nil + return rs, nil } // deep copy to avoid mutation now. // TODO this method need some work. Retry on conflict probably, though I suspect this is stomping status to something it probably shouldn't copyObj, err := api.Scheme.DeepCopy(rs) if err != nil { - return err + return nil, err } rs = copyObj.(*extensions.ReplicaSet) @@ -60,9 +60,10 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext // same status. newStatus.ObservedGeneration = rs.Generation - var getErr error + var getErr, updateErr error + var updatedRS *extensions.ReplicaSet for i, rs := 0, rs; ; i++ { - glog.V(4).Infof(fmt.Sprintf("Updating replica count for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) + + glog.V(4).Infof(fmt.Sprintf("Updating status for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) + fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) + fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) + fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) + @@ -70,17 +71,23 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration)) rs.Status = newStatus - _, updateErr = c.UpdateStatus(rs) - if updateErr == nil || i >= statusUpdateRetries { - return updateErr + updatedRS, updateErr = c.UpdateStatus(rs) + if updateErr == nil { + return updatedRS, nil + } + // Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit. + if i >= statusUpdateRetries { + break } // Update the ReplicaSet with the latest resource version for the next poll if rs, getErr = c.Get(rs.Name, metav1.GetOptions{}); getErr != nil { // If the GET fails we can't trust status.Replicas anymore. This error // is bound to be more interesting than the update failure. - return getErr + return nil, getErr } } + + return nil, updateErr } func calculateStatus(rs *extensions.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) extensions.ReplicaSetStatus { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 3a85c6143f9..c3d0f4b70f8 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -652,10 +652,16 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { newStatus := calculateStatus(rc, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. - if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus); err != nil { + updatedRC, err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus) + if err != nil { // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop return err } - + // Resync the ReplicationController after MinReadySeconds as a last line of defense to guard against clock-skew. + if manageReplicasErr == nil && updatedRC.Spec.MinReadySeconds > 0 && + updatedRC.Status.ReadyReplicas == *(updatedRC.Spec.Replicas) && + updatedRC.Status.AvailableReplicas != *(updatedRC.Spec.Replicas) { + rm.enqueueControllerAfter(updatedRC, time.Duration(updatedRC.Spec.MinReadySeconds)*time.Second) + } return manageReplicasErr } diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index 166a3c14a9a..214da4ec5e1 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -30,7 +30,7 @@ import ( ) // updateReplicationControllerStatus attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry. -func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (updateErr error) { +func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (*v1.ReplicationController, error) { // This is the steady state. It happens when the rc doesn't have any expectations, since // we do a periodic relist every 30s. If the generations differ but the replicas are // the same, a caller might've resized to the same replica count. @@ -40,7 +40,7 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc.Status.AvailableReplicas == newStatus.AvailableReplicas && rc.Generation == rc.Status.ObservedGeneration && reflect.DeepEqual(rc.Status.Conditions, newStatus.Conditions) { - return nil + return &rc, nil } // Save the generation number we acted on, otherwise we might wrongfully indicate // that we've seen a spec update when we retry. @@ -48,9 +48,10 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, // same status. newStatus.ObservedGeneration = rc.Generation - var getErr error + var getErr, updateErr error + var updatedRC *v1.ReplicationController for i, rc := 0, &rc; ; i++ { - glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", rc.Namespace, rc.Name) + + glog.V(4).Infof(fmt.Sprintf("Updating status for rc: %s/%s, ", rc.Namespace, rc.Name) + fmt.Sprintf("replicas %d->%d (need %d), ", rc.Status.Replicas, newStatus.Replicas, *(rc.Spec.Replicas)) + fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rc.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) + fmt.Sprintf("readyReplicas %d->%d, ", rc.Status.ReadyReplicas, newStatus.ReadyReplicas) + @@ -58,17 +59,23 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, fmt.Sprintf("sequence No: %v->%v", rc.Status.ObservedGeneration, newStatus.ObservedGeneration)) rc.Status = newStatus - _, updateErr = c.UpdateStatus(rc) - if updateErr == nil || i >= statusUpdateRetries { - return updateErr + updatedRC, updateErr = c.UpdateStatus(rc) + if updateErr == nil { + return updatedRC, nil + } + // Stop retrying if we exceed statusUpdateRetries - the replicationController will be requeued with a rate limit. + if i >= statusUpdateRetries { + break } // Update the controller with the latest resource version for the next poll if rc, getErr = c.Get(rc.Name, metav1.GetOptions{}); getErr != nil { // If the GET fails we can't trust status.Replicas anymore. This error // is bound to be more interesting than the update failure. - return getErr + return nil, getErr } } + + return nil, updateErr } // OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.