Scale up early when deployment creates new Replica set

This commit is contained in:
Janet Kuo 2016-02-04 18:05:38 -08:00
parent 3616b4bfec
commit e9262df456
3 changed files with 76 additions and 40 deletions

View File

@ -586,7 +586,7 @@ func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployme
maxOldV := maxRevision(allOldRSs) maxOldV := maxRevision(allOldRSs)
// Get new replica set with the updated revision number // Get new replica set with the updated revision number
newRS, err := dc.getNewReplicaSet(deployment, maxOldV) newRS, err := dc.getNewReplicaSet(deployment, maxOldV, oldRSs)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -648,7 +648,7 @@ func (dc *DeploymentController) getOldReplicaSets(deployment extensions.Deployme
// Returns a replica set that matches the intent of the given deployment. // Returns a replica set that matches the intent of the given deployment.
// It creates a new replica set if required. // It creates a new replica set if required.
// The revision of the new replica set will be updated to maxOldRevision + 1 // The revision of the new replica set will be updated to maxOldRevision + 1
func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64) (*extensions.ReplicaSet, error) { func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
// Calculate revision number for this new replica set // Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10) newRevision := strconv.FormatInt(maxOldRevision+1, 10)
@ -701,14 +701,22 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
} }
// Set new replica set's annotation // Set new replica set's annotation
setNewReplicaSetAnnotations(&deployment, &newRS, newRevision) setNewReplicaSetAnnotations(&deployment, &newRS, newRevision)
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, &newRS)
if err != nil {
return nil, err
}
newRS.Spec.Replicas = newReplicasCount
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil { if err != nil {
dc.rsExpectations.DeleteExpectations(dKey) dc.rsExpectations.DeleteExpectations(dKey)
return nil, fmt.Errorf("error creating replica set: %v", err) return nil, fmt.Errorf("error creating replica set: %v", err)
} }
if newReplicasCount > 0 {
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
}
err = dc.updateDeploymentRevision(deployment, newRevision) return createdRS, dc.updateDeploymentRevision(deployment, newRevision)
return createdRS, err
} }
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and // setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
@ -752,9 +760,12 @@ func (dc *DeploymentController) updateDeploymentRevision(deployment extensions.D
if deployment.Annotations == nil { if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string) deployment.Annotations = make(map[string]string)
} }
deployment.Annotations[deploymentutil.RevisionAnnotation] = revision if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
_, err := dc.updateDeployment(&deployment) deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
return err _, err := dc.updateDeployment(&deployment)
return err
}
return nil
} }
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) { func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
@ -764,29 +775,15 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
} }
if newRS.Spec.Replicas > deployment.Spec.Replicas { if newRS.Spec.Replicas > deployment.Spec.Replicas {
// Scale down. // Scale down.
_, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
return true, err return scaled, err
} }
// Check if we can scale up. newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, newRS)
maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Replicas)
if err != nil { if err != nil {
return false, err return false, err
} }
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
// Find the total number of pods return scaled, err
currentPodCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
maxTotalPods := deployment.Spec.Replicas + maxSurge
if currentPodCount >= maxTotalPods {
// Cannot scale up.
return false, nil
}
// Scale up.
scaleUpCount := maxTotalPods - currentPodCount
// Do not exceed the number of desired replicas.
scaleUpCount = integer.IntMin(scaleUpCount, deployment.Spec.Replicas-newRS.Spec.Replicas)
newReplicasCount := newRS.Spec.Replicas + scaleUpCount
_, err = dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
return true, err
} }
// Set expectationsCheck to false to bypass expectations check when testing // Set expectationsCheck to false to bypass expectations check when testing
@ -903,7 +900,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
scaledDownCount := integer.IntMin(maxCleanupCount-totalScaledDown, targetRS.Spec.Replicas-readyPodCount) scaledDownCount := integer.IntMin(maxCleanupCount-totalScaledDown, targetRS.Spec.Replicas-readyPodCount)
newReplicasCount := targetRS.Spec.Replicas - scaledDownCount newReplicasCount := targetRS.Spec.Replicas - scaledDownCount
_, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
if err != nil { if err != nil {
return totalScaledDown, err return totalScaledDown, err
} }
@ -949,7 +946,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [
// Scale down. // Scale down.
scaleDownCount := integer.IntMin(targetRS.Spec.Replicas, totalScaleDownCount-totalScaledDown) scaleDownCount := integer.IntMin(targetRS.Spec.Replicas, totalScaleDownCount-totalScaledDown)
newReplicasCount := targetRS.Spec.Replicas - scaleDownCount newReplicasCount := targetRS.Spec.Replicas - scaleDownCount
_, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
if err != nil { if err != nil {
return totalScaledDown, err return totalScaledDown, err
} }
@ -968,23 +965,21 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
if rs.Spec.Replicas == 0 { if rs.Spec.Replicas == 0 {
continue continue
} }
_, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment)
if err != nil { if err != nil {
return false, err return false, err
} }
scaled = true if scaledRS {
scaled = true
}
} }
return scaled, nil return scaled, nil
} }
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) { func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
if newRS.Spec.Replicas == deployment.Spec.Replicas { scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
// Scaling not required. return scaled, err
return false, nil
}
_, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
return true, err
} }
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) error { func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) error {
@ -1042,7 +1037,11 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet,
return return
} }
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (*extensions.ReplicaSet, error) { func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
// No need to scale
if rs.Spec.Replicas == newScale {
return false, rs, nil
}
scalingOperation := "down" scalingOperation := "down"
if rs.Spec.Replicas < newScale { if rs.Spec.Replicas < newScale {
scalingOperation = "up" scalingOperation = "up"
@ -1051,7 +1050,7 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
if err == nil { if err == nil {
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} }
return newRS, err return true, newRS, err
} }
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int) (*extensions.ReplicaSet, error) { func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int) (*extensions.ReplicaSet, error) {

View File

@ -26,6 +26,8 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/integer"
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod" podutil "k8s.io/kubernetes/pkg/util/pod"
) )
@ -233,3 +235,38 @@ func Revision(rs *extensions.ReplicaSet) (int64, error) {
} }
return strconv.ParseInt(v, 10, 64) return strconv.ParseInt(v, 10, 64)
} }
func IsRollingUpdate(deployment *extensions.Deployment) bool {
return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType
}
// NewRSNewReplicas calculates the number of replicas a deployment's new RS should have.
// When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it.
// 1) The new RS is saturated: newRS's replicas == deployment's replicas
// 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas
func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) (int, error) {
switch deployment.Spec.Strategy.Type {
case extensions.RollingUpdateDeploymentStrategyType:
// Check if we can scale up.
maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Replicas)
if err != nil {
return 0, err
}
// Find the total number of pods
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
maxTotalPods := deployment.Spec.Replicas + maxSurge
if currentPodCount >= maxTotalPods {
// Cannot scale up.
return newRS.Spec.Replicas, nil
}
// Scale up.
scaleUpCount := maxTotalPods - currentPodCount
// Do not exceed the number of desired replicas.
scaleUpCount = integer.IntMin(scaleUpCount, deployment.Spec.Replicas-newRS.Spec.Replicas)
return newRS.Spec.Replicas + scaleUpCount, nil
case extensions.RecreateDeploymentStrategyType:
return deployment.Spec.Replicas, nil
default:
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
}
}

View File

@ -386,8 +386,8 @@ func testRecreateDeployment(f *Framework) {
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c) newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(newRS).NotTo(Equal(nil)) Expect(newRS).NotTo(Equal(nil))
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName))) Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 3", newRS.Name)))
Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 3", newRS.Name))) Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName)))
// Check if it's updated to revision 1 correctly // Check if it's updated to revision 1 correctly
checkDeploymentRevision(c, ns, deploymentName, "1", "redis", "redis") checkDeploymentRevision(c, ns, deploymentName, "1", "redis", "redis")