diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 127fcc5a7f7..fc8d2520f8b 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" ) const CreatedByAnnotation = "kubernetes.io/created-by" @@ -91,9 +92,11 @@ type ControllerExpectationsInterface interface { ExpectDeletions(controllerKey string, dels int) error CreationObserved(controllerKey string) DeletionObserved(controllerKey string) + RaiseExpectations(controllerKey string, add, del int) + LowerExpectations(controllerKey string, add, del int) } -// ControllerExpectations is a ttl cache mapping controllers to what they expect to see before being woken up for a sync. +// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. type ControllerExpectations struct { cache.Store } @@ -123,6 +126,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo if exp, exists, err := r.GetExpectations(controllerKey); exists { if exp.Fulfilled() { return true + } else if exp.isExpired() { + glog.V(4).Infof("Controller expectations expired %#v", exp) + return true } else { glog.V(4).Infof("Controller still waiting on expectations %#v", exp) return false @@ -142,9 +148,17 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo return true } +// TODO: Extend ExpirationCache to support explicit expiration. +// TODO: Make this possible to disable in tests. +// TODO: Parameterize timeout. +// TODO: Support injection of clock. +func (exp *ControlleeExpectations) isExpired() bool { + return util.RealClock{}.Since(exp.timestamp) > 10*time.Second +} + // SetExpectations registers new expectations for the given controller. Forgets existing expectations. func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { - exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey} + exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: util.RealClock{}.Now()} glog.V(4).Infof("Setting expectations %+v", exp) return r.Add(exp) } @@ -158,22 +172,31 @@ func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) } // Decrements the expectation counts of the given controller. -func (r *ControllerExpectations) lowerExpectations(controllerKey string, add, del int) { +func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { - exp.Seen(int64(add), int64(del)) + exp.Add(int64(-add), int64(-del)) // The expectations might've been modified since the update on the previous line. - glog.V(4).Infof("Lowering expectations %+v", exp) + glog.V(4).Infof("Lowered expectations %+v", exp) + } +} + +// Increments the expectation counts of the given controller. +func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(add), int64(del)) + // The expectations might've been modified since the update on the previous line. + glog.V(4).Infof("Raised expectations %+v", exp) } } // CreationObserved atomically decrements the `add` expecation count of the given controller. func (r *ControllerExpectations) CreationObserved(controllerKey string) { - r.lowerExpectations(controllerKey, 1, 0) + r.LowerExpectations(controllerKey, 1, 0) } // DeletionObserved atomically decrements the `del` expectation count of the given controller. func (r *ControllerExpectations) DeletionObserved(controllerKey string) { - r.lowerExpectations(controllerKey, 0, 1) + r.LowerExpectations(controllerKey, 0, 1) } // Expectations are either fulfilled, or expire naturally. @@ -183,15 +206,16 @@ type Expectations interface { // ControlleeExpectations track controllee creates/deletes. type ControlleeExpectations struct { - add int64 - del int64 - key string + add int64 + del int64 + key string + timestamp time.Time } -// Seen decrements the add and del counters. -func (e *ControlleeExpectations) Seen(add, del int64) { - atomic.AddInt64(&e.add, -add) - atomic.AddInt64(&e.del, -del) +// Add increments the add and del counters. +func (e *ControlleeExpectations) Add(add, del int64) { + atomic.AddInt64(&e.add, add) + atomic.AddInt64(&e.del, del) } // Fulfilled returns true if this expectation has been fulfilled. diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 1b98031da22..c98a84f86d4 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -173,6 +173,8 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller &api.Pod{}, resyncPeriod(), framework.ResourceEventHandlerFuncs{ + // When pod is created, we need to update deployment's expectations + AddFunc: dc.addPod, // When pod updates (becomes ready), we need to enqueue deployment UpdateFunc: dc.updatePod, // When pod is deleted, we need to update deployment's expectations @@ -210,7 +212,8 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) { glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err) return } - dc.rsExpectations.CreationObserved(dKey) + // Decrement expected creations + dc.rsExpectations.LowerExpectations(dKey, 1, 0) dc.enqueueDeployment(d) } } @@ -302,6 +305,25 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De return nil } +// When a pod is created, update expectations of the controller that manages the pod. +func (dc *DeploymentController) addPod(obj interface{}) { + pod, ok := obj.(*api.Pod) + if !ok { + return + } + glog.V(4).Infof("Pod %s created.", pod.Name) + if d := dc.getDeploymentForPod(pod); d != nil { + dKey, err := controller.KeyFunc(d) + if err != nil { + glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err) + return + } + // Decrement expected creations + dc.podExpectations.LowerExpectations(dKey, 1, 0) + dc.enqueueDeployment(d) + } +} + // updatePod figures out what deployment(s) manage the ReplicaSet that manages the Pod when the Pod // is updated and wake them up. If anything of the Pods have changed, we need to awaken both // the old and new deployments. old and cur must be *api.Pod types. @@ -350,7 +372,9 @@ func (dc *DeploymentController) deletePod(obj interface{}) { glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err) return } - dc.podExpectations.DeletionObserved(dKey) + // Decrement expected deletions + dc.podExpectations.LowerExpectations(dKey, 0, 1) + dc.enqueueDeployment(d) } } @@ -383,7 +407,7 @@ func (dc *DeploymentController) worker() { defer dc.queue.Done(key) err := dc.syncHandler(key.(string)) if err != nil { - glog.Errorf("Error syncing deployment: %v", err) + glog.Errorf("Error syncing deployment %v: %v", key, err) } }() } @@ -417,8 +441,26 @@ func (dc *DeploymentController) syncDeployment(key string) error { dc.rsExpectations.DeleteExpectations(key) return nil } + d := *obj.(*extensions.Deployment) + // Note: The expectations cache is not thread-safe for a given key. + // Check the replica set expectations of the deployment before creating a new one. + // TODO: Explicitly expire expectations if we haven't sync'ed in a long time. + dKey, err := controller.KeyFunc(&d) + if err != nil { + return fmt.Errorf("couldn't get key for deployment %#v: %v", d, err) + } + if !dc.rsExpectations.SatisfiedExpectations(dKey) { + return fmt.Errorf("replicaset expectations not met yet for %v in syncDeployment", dKey) + } + if !dc.podExpectations.SatisfiedExpectations(dKey) { + return fmt.Errorf("pod expectations not met yet for %v in syncDeployment", dKey) + } + // Ensure that an expectations record exists and clear previous expectations. + dc.rsExpectations.SetExpectations(dKey, 0, 0) + dc.podExpectations.SetExpectations(dKey, 0, 0) + if d.Spec.Paused { // TODO: Implement scaling for paused deployments. // Dont take any action for paused deployment. @@ -573,7 +615,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension } // Scale down, if we can. - scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment, true) + scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment) if err != nil { return err } @@ -701,15 +743,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen return nil, nil } - // Check the replica set expectations of the deployment before creating a new one. - dKey, err := controller.KeyFunc(&deployment) - if err != nil { - return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err) - } - if !dc.rsExpectations.SatisfiedExpectations(dKey) { - dc.enqueueDeployment(&deployment) - return nil, fmt.Errorf("replica set expectations not met yet before getting new replica set\n") - } // new ReplicaSet does not exist, create one. namespace := deployment.ObjectMeta.Namespace podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template) @@ -717,12 +750,15 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen // Add podTemplateHash label to selector. newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) - // Set ReplicaSet expectations (1 ReplicaSet should be created) - dKey, err = controller.KeyFunc(&deployment) + // Set ReplicaSet expectations (1 ReplicaSet should be created). + // This clobbers previous expectations, but we checked that in syncDeployment. + // We don't set expectations for deletions of 0-replica ReplicaSets because re-setting + // expectations would clobber these, and redundant deletions shouldn't cause harm. + dKey, err := controller.KeyFunc(&deployment) if err != nil { - return nil, fmt.Errorf("couldn't get key for deployment controller %#v: %v", deployment, err) + return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err) } - dc.rsExpectations.ExpectCreations(dKey, 1) + // Create new ReplicaSet newRS := extensions.ReplicaSet{ ObjectMeta: api.ObjectMeta{ @@ -742,11 +778,23 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen if err != nil { return nil, err } + + // Increment expected creations + dc.rsExpectations.RaiseExpectations(dKey, 1, 0) + if newReplicasCount != 0 { + dc.podExpectations.RaiseExpectations(dKey, newReplicasCount, 0) + } + newRS.Spec.Replicas = newReplicasCount createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) if err != nil { - dc.rsExpectations.DeleteExpectations(dKey) - return nil, fmt.Errorf("error creating replica set: %v", err) + // Decrement expected creations + dc.rsExpectations.LowerExpectations(dKey, 1, 0) + if newReplicasCount != 0 { + dc.podExpectations.LowerExpectations(dKey, newReplicasCount, 0) + } + dc.enqueueDeployment(deployment) + return nil, fmt.Errorf("error creating replica set %v: %v", dKey, err) } if newReplicasCount > 0 { dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) @@ -822,24 +870,13 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl return scaled, err } -// Set expectationsCheck to false to bypass expectations check when testing -func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment, expectationsCheck bool) (bool, error) { +func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) { oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) if oldPodsCount == 0 { // Can't scale down further return false, nil } - // Check the expectations of deployment before reconciling - dKey, err := controller.KeyFunc(&deployment) - if err != nil { - return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err) - } - if expectationsCheck && !dc.podExpectations.SatisfiedExpectations(dKey) { - glog.V(4).Infof("Pod expectations not met yet before reconciling old replica sets\n") - return false, nil - } - minReadySeconds := deployment.Spec.MinReadySeconds allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{newRS}, minReadySeconds) @@ -903,10 +940,6 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep } totalScaledDown := cleanupCount + scaledDownCount - if expectationsCheck { - dc.podExpectations.ExpectDeletions(dKey, totalScaledDown) - } - return totalScaledDown > 0, nil } @@ -1081,13 +1114,35 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep if rs.Spec.Replicas == newScale { return false, rs, nil } - scalingOperation := "down" + dKey, err := controller.KeyFunc(&deployment) + if err != nil { + return false, nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err) + } + var scalingOperation string + // Set expectations first, because if the update is successful, the expectations will be handled asynchronously immediately. if rs.Spec.Replicas < newScale { scalingOperation = "up" + // Increment expected creations + dc.podExpectations.RaiseExpectations(dKey, newScale-rs.Spec.Replicas, 0) + } else { + scalingOperation = "down" + // Increment expected deletions + dc.podExpectations.RaiseExpectations(dKey, 0, rs.Spec.Replicas-newScale) } newRS, err := dc.scaleReplicaSet(rs, newScale) if err == nil { dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) + } else { + // Back out the expectation changes. If we observed a failure even though the update succeeded, this will be wrong. + if rs.Spec.Replicas < newScale { + // Decrement expected creations + dc.podExpectations.LowerExpectations(dKey, newScale-rs.Spec.Replicas, 0) + dc.enqueueDeployment(deployment) + } else { + // Decrement expected deletions + dc.podExpectations.LowerExpectations(dKey, 0, rs.Spec.Replicas-newScale) + dc.enqueueDeployment(deployment) + } } return true, newRS, err } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 01599d1cf76..1c14b80e0c0 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -93,8 +93,10 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0)) fake := fake.Clientset{} controller := &DeploymentController{ - client: &fake, - eventRecorder: &record.FakeRecorder{}, + client: &fake, + eventRecorder: &record.FakeRecorder{}, + podExpectations: controller.NewControllerExpectations(), + rsExpectations: controller.NewControllerExpectations(), } scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment) if err != nil { @@ -267,11 +269,13 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { return false, nil, nil }) controller := &DeploymentController{ - client: &fakeClientset, - eventRecorder: &record.FakeRecorder{}, + client: &fakeClientset, + eventRecorder: &record.FakeRecorder{}, + podExpectations: controller.NewControllerExpectations(), + rsExpectations: controller.NewControllerExpectations(), } - scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment, false) + scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -371,8 +375,10 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { }) controller := &DeploymentController{ - client: &fakeClientset, - eventRecorder: &record.FakeRecorder{}, + client: &fakeClientset, + eventRecorder: &record.FakeRecorder{}, + podExpectations: controller.NewControllerExpectations(), + rsExpectations: controller.NewControllerExpectations(), } cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, test.maxCleanupCount) if err != nil { @@ -458,8 +464,10 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing return false, nil, nil }) controller := &DeploymentController{ - client: &fakeClientset, - eventRecorder: &record.FakeRecorder{}, + client: &fakeClientset, + eventRecorder: &record.FakeRecorder{}, + podExpectations: controller.NewControllerExpectations(), + rsExpectations: controller.NewControllerExpectations(), } scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment) if err != nil { diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 200c117f7c4..0258a39a6e0 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -836,7 +836,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { } // This should have no effect, since we've deleted the ReplicaSet. - podExp.Seen(1, 0) + podExp.Add(-1, 0) manager.podStore.Store.Replace(make([]interface{}, 0), "0") manager.syncReplicaSet(getKey(rs, t)) validateSyncReplicaSet(t, &fakePodControl, 0, 0) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 019cf9d76f8..5f54eb543fa 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -819,7 +819,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { } // This should have no effect, since we've deleted the rc. - podExp.Seen(1, 0) + podExp.Add(-1, 0) manager.podStore.Store.Replace(make([]interface{}, 0), "0") manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0)