Merge pull request #22138 from bgrant0607/bugfix1

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2016-02-28 02:00:40 -08:00
10 changed files with 108 additions and 192 deletions

View File

@@ -36,7 +36,21 @@ import (
"k8s.io/kubernetes/pkg/util"
)
const CreatedByAnnotation = "kubernetes.io/created-by"
const (
CreatedByAnnotation = "kubernetes.io/created-by"
// If a watch drops a delete event for a pod, it'll take this long
// before a dormant controller waiting for those packets is woken up anyway. It is
// specifically targeted at the case where some problem prevents an update
// of expectations, without it the controller could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// Currently a controller can service (create *and* observe the watch events for said
// creation) about 10 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 5 * time.Minute
)
var (
KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
@@ -150,10 +164,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo
// TODO: Extend ExpirationCache to support explicit expiration.
// TODO: Make this possible to disable in tests.
// TODO: Parameterize timeout.
// TODO: Support injection of clock.
func (exp *ControlleeExpectations) isExpired() bool {
return util.RealClock{}.Since(exp.timestamp) > 10*time.Second
return util.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.

View File

@@ -49,7 +49,7 @@ import (
const (
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
// of all deployments that have fulfilled their expectations at least this often.
// of all deployments.
// This recomputation happens based on contents in the local caches.
FullDeploymentResyncPeriod = 30 * time.Second
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
@@ -85,13 +85,6 @@ type DeploymentController struct {
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
// A TTLCache of pod creates/deletes each deployment expects to see
podExpectations controller.ControllerExpectationsInterface
// A TTLCache of ReplicaSet creates/deletes each deployment it expects to see
// TODO: make expectation model understand (ReplicaSet) updates (besides adds and deletes)
rsExpectations controller.ControllerExpectationsInterface
// Deployments that need to be synced
queue *workqueue.Type
}
@@ -104,11 +97,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{client.Core().Events("")})
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
}
dc.dStore.Store, dc.dController = framework.NewInformer(
@@ -126,19 +117,19 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
AddFunc: func(obj interface{}) {
d := obj.(*extensions.Deployment)
glog.V(4).Infof("Adding deployment %s", d.Name)
dc.enqueueDeployment(obj)
dc.enqueueDeployment(d)
},
UpdateFunc: func(old, cur interface{}) {
oldD := old.(*extensions.Deployment)
glog.V(4).Infof("Updating deployment %s", oldD.Name)
// Resync on deployment object relist.
dc.enqueueDeployment(cur)
dc.enqueueDeployment(cur.(*extensions.Deployment))
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
d := obj.(*extensions.Deployment)
glog.V(4).Infof("Deleting deployment %s", d.Name)
dc.enqueueDeployment(obj)
dc.enqueueDeployment(d)
},
},
)
@@ -173,11 +164,8 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
// When pod is created, we need to update deployment's expectations
AddFunc: dc.addPod,
// When pod updates (becomes ready), we need to enqueue deployment
AddFunc: dc.addPod,
UpdateFunc: dc.updatePod,
// When pod is deleted, we need to update deployment's expectations
DeleteFunc: dc.deletePod,
},
)
@@ -207,13 +195,6 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
rs := obj.(*extensions.ReplicaSet)
glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
if d := dc.getDeploymentForReplicaSet(rs); d != nil {
dKey, err := controller.KeyFunc(d)
if err != nil {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
// Decrement expected creations
dc.rsExpectations.LowerExpectations(dKey, 1, 0)
dc.enqueueDeployment(d)
}
}
@@ -305,7 +286,7 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De
return nil
}
// When a pod is created, update expectations of the controller that manages the pod.
// When a pod is created, ensure its controller syncs
func (dc *DeploymentController) addPod(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
@@ -313,13 +294,6 @@ func (dc *DeploymentController) addPod(obj interface{}) {
}
glog.V(4).Infof("Pod %s created.", pod.Name)
if d := dc.getDeploymentForPod(pod); d != nil {
dKey, err := controller.KeyFunc(d)
if err != nil {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
// Decrement expected creations
dc.podExpectations.LowerExpectations(dKey, 1, 0)
dc.enqueueDeployment(d)
}
}
@@ -344,7 +318,7 @@ func (dc *DeploymentController) updatePod(old, cur interface{}) {
}
}
// When a pod is deleted, update expectations of the controller that manages the pod.
// When a pod is deleted, ensure its controller syncs.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (dc *DeploymentController) deletePod(obj interface{}) {
pod, ok := obj.(*api.Pod)
@@ -367,22 +341,14 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
}
glog.V(4).Infof("Pod %s deleted.", pod.Name)
if d := dc.getDeploymentForPod(pod); d != nil {
dKey, err := controller.KeyFunc(d)
if err != nil {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
// Decrement expected deletions
dc.podExpectations.LowerExpectations(dKey, 0, 1)
dc.enqueueDeployment(d)
}
}
// obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item.
func (dc *DeploymentController) enqueueDeployment(obj interface{}) {
key, err := controller.KeyFunc(obj)
func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
glog.Errorf("Couldn't get key for object %+v: %v", deployment, err)
return
}
@@ -437,29 +403,10 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
if !exists {
glog.Infof("Deployment has been deleted %v", key)
dc.podExpectations.DeleteExpectations(key)
dc.rsExpectations.DeleteExpectations(key)
return nil
}
d := *obj.(*extensions.Deployment)
// Note: The expectations cache is not thread-safe for a given key.
// Check the replica set expectations of the deployment before creating a new one.
// TODO: Explicitly expire expectations if we haven't sync'ed in a long time.
dKey, err := controller.KeyFunc(&d)
if err != nil {
return fmt.Errorf("couldn't get key for deployment %#v: %v", d, err)
}
if !dc.rsExpectations.SatisfiedExpectations(dKey) {
return fmt.Errorf("replicaset expectations not met yet for %v in syncDeployment", dKey)
}
if !dc.podExpectations.SatisfiedExpectations(dKey) {
return fmt.Errorf("pod expectations not met yet for %v in syncDeployment", dKey)
}
// Ensure that an expectations record exists and clear previous expectations.
dc.rsExpectations.SetExpectations(dKey, 0, 0)
dc.podExpectations.SetExpectations(dKey, 0, 0)
d := obj.(*extensions.Deployment)
if d.Spec.Paused {
// TODO: Implement scaling for paused deployments.
@@ -467,11 +414,11 @@ func (dc *DeploymentController) syncDeployment(key string) error {
// But keep the status up-to-date.
// Ignore paused deployments
glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name)
return dc.syncPausedDeploymentStatus(&d)
return dc.syncPausedDeploymentStatus(d)
}
if d.Spec.RollbackTo != nil {
revision := d.Spec.RollbackTo.Revision
if _, err = dc.rollback(&d, &revision); err != nil {
if _, err = dc.rollback(d, &revision); err != nil {
return err
}
}
@@ -487,19 +434,19 @@ func (dc *DeploymentController) syncDeployment(key string) error {
// Updates the status of a paused deployment
func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSets(*deployment, false)
newRS, oldRSs, err := dc.getAllReplicaSets(deployment, false)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, *deployment)
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
}
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true)
newRS, allOldRSs, err := dc.getAllReplicaSets(deployment, true)
if err != nil {
return nil, err
}
@@ -550,7 +497,7 @@ func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *e
return dc.updateDeployment(deployment)
}
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.Deployment) error {
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
newRS, oldRSs, err := dc.getAllReplicaSets(deployment, false)
if err != nil {
@@ -597,7 +544,7 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Dep
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
}
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error {
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSets(deployment, true)
if err != nil {
return err
@@ -634,7 +581,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension
}
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d extensions.Deployment) error {
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
totalActualReplicas, updatedReplicas, availableReplicas, _, err := dc.calculateStatus(allRSs, newRS, d)
if err != nil {
return err
@@ -646,7 +593,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic
}
// getAllReplicaSets returns all the replica sets for the provided deployment (new and all old).
func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
func (dc *DeploymentController) getAllReplicaSets(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
_, allOldRSs, err := dc.getOldReplicaSets(deployment)
if err != nil {
return nil, nil, err
@@ -703,7 +650,7 @@ func lastRevision(allRSs []*extensions.ReplicaSet) int64 {
// getOldReplicaSets returns two sets of old replica sets of the deployment. The first set of old replica sets doesn't include
// the ones with no pods, and the second set of old replica sets include all old replica sets.
func (dc *DeploymentController) getOldReplicaSets(deployment extensions.Deployment) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
func (dc *DeploymentController) getOldReplicaSets(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
return deploymentutil.GetOldReplicaSetsFromLists(deployment, dc.client,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
@@ -717,7 +664,7 @@ func (dc *DeploymentController) getOldReplicaSets(deployment extensions.Deployme
// Returns a replica set that matches the intent of the given deployment.
// It creates a new replica set if required.
// The revision of the new replica set will be updated to maxOldRevision + 1
func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
// Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
@@ -733,7 +680,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
return nil, err
} else if existingNewRS != nil {
// Set existing new replica set's annotation
if setNewReplicaSetAnnotations(&deployment, existingNewRS, newRevision) {
if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision) {
return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS)
}
return existingNewRS, nil
@@ -750,20 +697,12 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
// Add podTemplateHash label to selector.
newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
// Set ReplicaSet expectations (1 ReplicaSet should be created).
// This clobbers previous expectations, but we checked that in syncDeployment.
// We don't set expectations for deletions of 0-replica ReplicaSets because re-setting
// expectations would clobber these, and redundant deletions shouldn't cause harm.
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
}
// Create new ReplicaSet
newRS := extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
GenerateName: deployment.Name + "-",
Namespace: namespace,
// Make the name deterministic, to ensure idempotence
Name: deployment.Name + "-" + fmt.Sprintf("%d", podTemplateSpecHash),
Namespace: namespace,
},
Spec: extensions.ReplicaSetSpec{
Replicas: 0,
@@ -772,32 +711,21 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
},
}
// Set new replica set's annotation
setNewReplicaSetAnnotations(&deployment, &newRS, newRevision)
setNewReplicaSetAnnotations(deployment, &newRS, newRevision)
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
if err != nil {
return nil, err
}
// Increment expected creations
dc.rsExpectations.RaiseExpectations(dKey, 1, 0)
if newReplicasCount != 0 {
dc.podExpectations.RaiseExpectations(dKey, newReplicasCount, 0)
}
newRS.Spec.Replicas = newReplicasCount
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil {
// Decrement expected creations
dc.rsExpectations.LowerExpectations(dKey, 1, 0)
if newReplicasCount != 0 {
dc.podExpectations.LowerExpectations(dKey, newReplicasCount, 0)
}
dc.enqueueDeployment(deployment)
return nil, fmt.Errorf("error creating replica set %v: %v", dKey, err)
return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err)
}
if newReplicasCount > 0 {
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
}
return createdRS, dc.updateDeploymentRevision(deployment, newRevision)
@@ -840,19 +768,19 @@ func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs
return rsAnnotationsChanged
}
func (dc *DeploymentController) updateDeploymentRevision(deployment extensions.Deployment, revision string) error {
func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error {
if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
_, err := dc.updateDeployment(&deployment)
_, err := dc.updateDeployment(deployment)
return err
}
return nil
}
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
if newRS.Spec.Replicas == deployment.Spec.Replicas {
// Scaling not required.
return false, nil
@@ -862,7 +790,7 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
return scaled, err
}
newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
if err != nil {
return false, err
}
@@ -870,7 +798,7 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
return scaled, err
}
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// Can't scale down further
@@ -944,7 +872,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep
}
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment, maxCleanupCount int) (int, error) {
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, maxCleanupCount int) (int, error) {
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
@@ -980,7 +908,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) (int, error) {
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int, error) {
maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas, false)
if err != nil {
return 0, err
@@ -1027,7 +955,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [
}
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate"
func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
scaled := false
for _, rs := range oldRSs {
// Scaling not required.
@@ -1046,12 +974,12 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
}
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
return scaled, err
}
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) error {
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
diff := len(oldRSs) - *deployment.Spec.RevisionHistoryLimit
if diff <= 0 {
return nil
@@ -1076,12 +1004,12 @@ func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.Repli
return utilerrors.NewAggregate(errList)
}
func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) error {
func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error {
totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas, err := dc.calculateStatus(allRSs, newRS, deployment)
if err != nil {
return err
}
newDeployment := deployment
newDeployment := *deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
newDeployment.Status = extensions.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
@@ -1095,7 +1023,7 @@ func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.Repl
return err
}
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas int, err error) {
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas int, err error) {
totalActualReplicas = deploymentutil.GetActualReplicaCountForReplicaSets(allRSs)
updatedReplicas = deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS})
minReadySeconds := deployment.Spec.MinReadySeconds
@@ -1109,46 +1037,29 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet,
return
}
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
// No need to scale
if rs.Spec.Replicas == newScale {
return false, rs, nil
}
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return false, nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
}
var scalingOperation string
// Set expectations first, because if the update is successful, the expectations will be handled asynchronously immediately.
if rs.Spec.Replicas < newScale {
scalingOperation = "up"
// Increment expected creations
dc.podExpectations.RaiseExpectations(dKey, newScale-rs.Spec.Replicas, 0)
} else {
scalingOperation = "down"
// Increment expected deletions
dc.podExpectations.RaiseExpectations(dKey, 0, rs.Spec.Replicas-newScale)
}
newRS, err := dc.scaleReplicaSet(rs, newScale)
if err == nil {
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
// Back out the expectation changes. If we observed a failure even though the update succeeded, this will be wrong.
if rs.Spec.Replicas < newScale {
// Decrement expected creations
dc.podExpectations.LowerExpectations(dKey, newScale-rs.Spec.Replicas, 0)
dc.enqueueDeployment(deployment)
} else {
// Decrement expected deletions
dc.podExpectations.LowerExpectations(dKey, 0, rs.Spec.Replicas-newScale)
dc.enqueueDeployment(deployment)
}
dc.enqueueDeployment(deployment)
}
return true, newRS, err
}
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int) (*extensions.ReplicaSet, error) {
// TODO: Using client for now, update to use store when it is ready.
// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
rs.Spec.Replicas = newScale
return dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
}
@@ -1159,7 +1070,7 @@ func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployme
}
func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rs *extensions.ReplicaSet) (d *extensions.Deployment, performedRollback bool, err error) {
if !reflect.DeepEqual(deploymentutil.GetNewReplicaSetTemplate(*deployment), *rs.Spec.Template) {
if !reflect.DeepEqual(deploymentutil.GetNewReplicaSetTemplate(deployment), *rs.Spec.Template) {
glog.Infof("Rolling back deployment %s to template spec %+v", deployment.Name, rs.Spec.Template.Spec)
deploymentutil.SetFromReplicaSetTemplate(deployment, *rs.Spec.Template)
performedRollback = true

View File

@@ -93,12 +93,10 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0))
fake := fake.Clientset{}
controller := &DeploymentController{
client: &fake,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fake,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment)
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
@@ -269,13 +267,11 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
return false, nil, nil
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment)
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
@@ -375,12 +371,10 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, test.maxCleanupCount)
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, test.maxCleanupCount)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
@@ -464,12 +458,10 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
return false, nil, nil
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
@@ -555,7 +547,7 @@ func TestDeploymentController_cleanupOldReplicaSets(t *testing.T) {
}
d := newDeployment(1, &tests[i].revisionHistoryLimit)
controller.cleanupOldReplicaSets(test.oldRSs, *d)
controller.cleanupOldReplicaSets(test.oldRSs, d)
gotDeletions := 0
for _, action := range fake.Actions() {