mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #20696 from janetkuo/deployment-new-rc-replicas
Auto commit by PR queue bot
This commit is contained in:
commit
f618cb471f
@ -441,7 +441,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||||||
|
|
||||||
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
|
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
|
||||||
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
|
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
|
||||||
newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment)
|
newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -493,7 +493,8 @@ func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
|
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
|
||||||
newRS, oldRSs, err := dc.getAllReplicaSets(deployment)
|
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
|
||||||
|
newRS, oldRSs, err := dc.getAllReplicaSets(deployment, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -509,6 +510,16 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Dep
|
|||||||
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
|
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we need to create a new RS, create it now
|
||||||
|
// TODO: Create a new RS without re-listing all RSs.
|
||||||
|
if newRS == nil {
|
||||||
|
newRS, oldRSs, err = dc.getAllReplicaSets(deployment, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
allRSs = append(oldRSs, newRS)
|
||||||
|
}
|
||||||
|
|
||||||
// scale up new replica set
|
// scale up new replica set
|
||||||
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment)
|
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -529,7 +540,7 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Dep
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error {
|
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error {
|
||||||
newRS, oldRSs, err := dc.getAllReplicaSets(deployment)
|
newRS, oldRSs, err := dc.getAllReplicaSets(deployment, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -577,7 +588,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getAllReplicaSets returns all the replica sets for the provided deployment (new and all old).
|
// getAllReplicaSets returns all the replica sets for the provided deployment (new and all old).
|
||||||
func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployment) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
||||||
_, allOldRSs, err := dc.getOldReplicaSets(deployment)
|
_, allOldRSs, err := dc.getOldReplicaSets(deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -586,13 +597,13 @@ func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployme
|
|||||||
maxOldV := maxRevision(allOldRSs)
|
maxOldV := maxRevision(allOldRSs)
|
||||||
|
|
||||||
// Get new replica set with the updated revision number
|
// Get new replica set with the updated revision number
|
||||||
newRS, err := dc.getNewReplicaSet(deployment, maxOldV)
|
newRS, err := dc.getNewReplicaSet(deployment, maxOldV, allOldRSs, createIfNotExisted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync deployment's revision number with new replica set
|
// Sync deployment's revision number with new replica set
|
||||||
if newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 &&
|
if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 &&
|
||||||
(deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) {
|
(deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) {
|
||||||
if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil {
|
if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil {
|
||||||
glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err)
|
glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err)
|
||||||
@ -648,7 +659,7 @@ func (dc *DeploymentController) getOldReplicaSets(deployment extensions.Deployme
|
|||||||
// Returns a replica set that matches the intent of the given deployment.
|
// Returns a replica set that matches the intent of the given deployment.
|
||||||
// It creates a new replica set if required.
|
// It creates a new replica set if required.
|
||||||
// The revision of the new replica set will be updated to maxOldRevision + 1
|
// The revision of the new replica set will be updated to maxOldRevision + 1
|
||||||
func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64) (*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
|
||||||
// Calculate revision number for this new replica set
|
// Calculate revision number for this new replica set
|
||||||
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
||||||
|
|
||||||
@ -665,6 +676,11 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
|
|||||||
}
|
}
|
||||||
return existingNewRS, nil
|
return existingNewRS, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !createIfNotExisted {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Check the replica set expectations of the deployment before creating a new one.
|
// Check the replica set expectations of the deployment before creating a new one.
|
||||||
dKey, err := controller.KeyFunc(&deployment)
|
dKey, err := controller.KeyFunc(&deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -701,14 +717,22 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
|
|||||||
}
|
}
|
||||||
// Set new replica set's annotation
|
// Set new replica set's annotation
|
||||||
setNewReplicaSetAnnotations(&deployment, &newRS, newRevision)
|
setNewReplicaSetAnnotations(&deployment, &newRS, newRevision)
|
||||||
|
allRSs := append(oldRSs, &newRS)
|
||||||
|
newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, &newRS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newRS.Spec.Replicas = newReplicasCount
|
||||||
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
|
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dc.rsExpectations.DeleteExpectations(dKey)
|
dc.rsExpectations.DeleteExpectations(dKey)
|
||||||
return nil, fmt.Errorf("error creating replica set: %v", err)
|
return nil, fmt.Errorf("error creating replica set: %v", err)
|
||||||
}
|
}
|
||||||
|
if newReplicasCount > 0 {
|
||||||
|
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
|
||||||
|
}
|
||||||
|
|
||||||
err = dc.updateDeploymentRevision(deployment, newRevision)
|
return createdRS, dc.updateDeploymentRevision(deployment, newRevision)
|
||||||
return createdRS, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
|
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
|
||||||
@ -752,9 +776,12 @@ func (dc *DeploymentController) updateDeploymentRevision(deployment extensions.D
|
|||||||
if deployment.Annotations == nil {
|
if deployment.Annotations == nil {
|
||||||
deployment.Annotations = make(map[string]string)
|
deployment.Annotations = make(map[string]string)
|
||||||
}
|
}
|
||||||
|
if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
|
||||||
deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
|
deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
|
||||||
_, err := dc.updateDeployment(&deployment)
|
_, err := dc.updateDeployment(&deployment)
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
|
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
|
||||||
@ -764,29 +791,15 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
|
|||||||
}
|
}
|
||||||
if newRS.Spec.Replicas > deployment.Spec.Replicas {
|
if newRS.Spec.Replicas > deployment.Spec.Replicas {
|
||||||
// Scale down.
|
// Scale down.
|
||||||
_, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
|
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
|
||||||
return true, err
|
return scaled, err
|
||||||
}
|
}
|
||||||
// Check if we can scale up.
|
newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, newRS)
|
||||||
maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Replicas)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
|
||||||
// Find the total number of pods
|
return scaled, err
|
||||||
currentPodCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
|
|
||||||
maxTotalPods := deployment.Spec.Replicas + maxSurge
|
|
||||||
if currentPodCount >= maxTotalPods {
|
|
||||||
// Cannot scale up.
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
// Scale up.
|
|
||||||
scaleUpCount := maxTotalPods - currentPodCount
|
|
||||||
// Do not exceed the number of desired replicas.
|
|
||||||
scaleUpCount = integer.IntMin(scaleUpCount, deployment.Spec.Replicas-newRS.Spec.Replicas)
|
|
||||||
newReplicasCount := newRS.Spec.Replicas + scaleUpCount
|
|
||||||
_, err = dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
|
|
||||||
return true, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set expectationsCheck to false to bypass expectations check when testing
|
// Set expectationsCheck to false to bypass expectations check when testing
|
||||||
@ -903,7 +916,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
|
|||||||
|
|
||||||
scaledDownCount := integer.IntMin(maxCleanupCount-totalScaledDown, targetRS.Spec.Replicas-readyPodCount)
|
scaledDownCount := integer.IntMin(maxCleanupCount-totalScaledDown, targetRS.Spec.Replicas-readyPodCount)
|
||||||
newReplicasCount := targetRS.Spec.Replicas - scaledDownCount
|
newReplicasCount := targetRS.Spec.Replicas - scaledDownCount
|
||||||
_, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
|
_, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return totalScaledDown, err
|
return totalScaledDown, err
|
||||||
}
|
}
|
||||||
@ -949,7 +962,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [
|
|||||||
// Scale down.
|
// Scale down.
|
||||||
scaleDownCount := integer.IntMin(targetRS.Spec.Replicas, totalScaleDownCount-totalScaledDown)
|
scaleDownCount := integer.IntMin(targetRS.Spec.Replicas, totalScaleDownCount-totalScaledDown)
|
||||||
newReplicasCount := targetRS.Spec.Replicas - scaleDownCount
|
newReplicasCount := targetRS.Spec.Replicas - scaleDownCount
|
||||||
_, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
|
_, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return totalScaledDown, err
|
return totalScaledDown, err
|
||||||
}
|
}
|
||||||
@ -968,23 +981,21 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
|
|||||||
if rs.Spec.Replicas == 0 {
|
if rs.Spec.Replicas == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment)
|
scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
if scaledRS {
|
||||||
scaled = true
|
scaled = true
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return scaled, nil
|
return scaled, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
|
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
|
||||||
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
|
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
|
||||||
if newRS.Spec.Replicas == deployment.Spec.Replicas {
|
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
|
||||||
// Scaling not required.
|
return scaled, err
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
_, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
|
|
||||||
return true, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) error {
|
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) error {
|
||||||
@ -1042,7 +1053,11 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
|
||||||
|
// No need to scale
|
||||||
|
if rs.Spec.Replicas == newScale {
|
||||||
|
return false, rs, nil
|
||||||
|
}
|
||||||
scalingOperation := "down"
|
scalingOperation := "down"
|
||||||
if rs.Spec.Replicas < newScale {
|
if rs.Spec.Replicas < newScale {
|
||||||
scalingOperation = "up"
|
scalingOperation = "up"
|
||||||
@ -1051,7 +1066,7 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
|
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
|
||||||
}
|
}
|
||||||
return newRS, err
|
return true, newRS, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int) (*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int) (*extensions.ReplicaSet, error) {
|
||||||
|
@ -26,6 +26,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/util/integer"
|
||||||
|
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
|
||||||
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
||||||
podutil "k8s.io/kubernetes/pkg/util/pod"
|
podutil "k8s.io/kubernetes/pkg/util/pod"
|
||||||
)
|
)
|
||||||
@ -233,3 +235,38 @@ func Revision(rs *extensions.ReplicaSet) (int64, error) {
|
|||||||
}
|
}
|
||||||
return strconv.ParseInt(v, 10, 64)
|
return strconv.ParseInt(v, 10, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsRollingUpdate(deployment *extensions.Deployment) bool {
|
||||||
|
return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRSNewReplicas calculates the number of replicas a deployment's new RS should have.
|
||||||
|
// When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it.
|
||||||
|
// 1) The new RS is saturated: newRS's replicas == deployment's replicas
|
||||||
|
// 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas
|
||||||
|
func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) (int, error) {
|
||||||
|
switch deployment.Spec.Strategy.Type {
|
||||||
|
case extensions.RollingUpdateDeploymentStrategyType:
|
||||||
|
// Check if we can scale up.
|
||||||
|
maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Replicas)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
// Find the total number of pods
|
||||||
|
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
|
||||||
|
maxTotalPods := deployment.Spec.Replicas + maxSurge
|
||||||
|
if currentPodCount >= maxTotalPods {
|
||||||
|
// Cannot scale up.
|
||||||
|
return newRS.Spec.Replicas, nil
|
||||||
|
}
|
||||||
|
// Scale up.
|
||||||
|
scaleUpCount := maxTotalPods - currentPodCount
|
||||||
|
// Do not exceed the number of desired replicas.
|
||||||
|
scaleUpCount = integer.IntMin(scaleUpCount, deployment.Spec.Replicas-newRS.Spec.Replicas)
|
||||||
|
return newRS.Spec.Replicas + scaleUpCount, nil
|
||||||
|
case extensions.RecreateDeploymentStrategyType:
|
||||||
|
return deployment.Spec.Replicas, nil
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user