controller: poll replica sets from the cache

This commit is contained in:
Michail Kargakis 2017-02-10 17:43:30 +01:00
parent ce998a9fa7
commit 7bbf7b0473
2 changed files with 13 additions and 8 deletions

View File

@ -180,7 +180,9 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
} }
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration { if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, updatedRS.Namespace, updatedRS.Name); err != nil { // TODO: Revisit if we really need to wait here as opposed to returning and
// potentially unblocking this worker (can wait up to 1min before timing out).
if err = deploymentutil.WaitForReplicaSetUpdated(dc.rsLister, updatedRS.Generation, updatedRS.Namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for replica set %s/%s to be observed by controller: %v", updatedRS.Namespace, updatedRS.Name, err) return nil, fmt.Errorf("error waiting for replica set %s/%s to be observed by controller: %v", updatedRS.Namespace, updatedRS.Name, err)
} }
glog.V(4).Infof("Observed the update of replica set %s/%s's pod template with hash %s.", rs.Namespace, rs.Name, hash) glog.V(4).Infof("Observed the update of replica set %s/%s's pod template with hash %s.", rs.Namespace, rs.Name, hash)
@ -213,7 +215,9 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
// WaitForReplicaSetUpdated, the replicaset controller should have dropped // WaitForReplicaSetUpdated, the replicaset controller should have dropped
// FullyLabeledReplicas to 0 already, we only need to wait it to increase // FullyLabeledReplicas to 0 already, we only need to wait it to increase
// back to the number of replicas in the spec. // back to the number of replicas in the spec.
if err := deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, updatedRS.Namespace, updatedRS.Name); err != nil { // TODO: Revisit if we really need to wait here as opposed to returning and
// potentially unblocking this worker (can wait up to 1min before timing out).
if err := deploymentutil.WaitForPodsHashPopulated(dc.rsLister, updatedRS.Generation, updatedRS.Namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("Replica set %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Namespace, updatedRS.Name, err) return nil, fmt.Errorf("Replica set %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Namespace, updatedRS.Name, err)
} }

View File

@ -41,6 +41,7 @@ import (
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
) )
@ -661,9 +662,9 @@ func FindOldReplicaSets(deployment *extensions.Deployment, rsList []*extensions.
} }
// WaitForReplicaSetUpdated polls the replica set until it is updated. // WaitForReplicaSetUpdated polls the replica set until it is updated.
func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { func WaitForReplicaSetUpdated(c extensionslisters.ReplicaSetLister, desiredGeneration int64, namespace, name string) error {
return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { return wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
rs, err := c.Extensions().ReplicaSets(namespace).Get(name, metav1.GetOptions{}) rs, err := c.ReplicaSets(namespace).Get(name)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -672,9 +673,9 @@ func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, na
} }
// WaitForPodsHashPopulated polls the replica set until updated and fully labeled. // WaitForPodsHashPopulated polls the replica set until updated and fully labeled.
func WaitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { func WaitForPodsHashPopulated(c extensionslisters.ReplicaSetLister, desiredGeneration int64, namespace, name string) error {
return wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) { return wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
rs, err := c.Extensions().ReplicaSets(namespace).Get(name, metav1.GetOptions{}) rs, err := c.ReplicaSets(namespace).Get(name)
if err != nil { if err != nil {
return false, err return false, err
} }