mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Update replica annotations every time they are out of sync
This commit is contained in:
parent
de8214ad4d
commit
f52ea8fc67
@ -438,31 +438,35 @@ func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *
|
|||||||
// drives what happens in case we are trying to scale replica sets of the same size.
|
// drives what happens in case we are trying to scale replica sets of the same size.
|
||||||
// In such a case when scaling up, we should scale up newer replica sets first, and
|
// In such a case when scaling up, we should scale up newer replica sets first, and
|
||||||
// when scaling down, we should scale down older replica sets first.
|
// when scaling down, we should scale down older replica sets first.
|
||||||
scalingOperation := "up"
|
var scalingOperation string
|
||||||
switch {
|
switch {
|
||||||
case deploymentReplicasToAdd > 0:
|
case deploymentReplicasToAdd > 0:
|
||||||
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
|
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
|
||||||
|
scalingOperation = "up"
|
||||||
|
|
||||||
case deploymentReplicasToAdd < 0:
|
case deploymentReplicasToAdd < 0:
|
||||||
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
|
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
|
||||||
scalingOperation = "down"
|
scalingOperation = "down"
|
||||||
|
|
||||||
default: /* deploymentReplicasToAdd == 0 */
|
|
||||||
// Nothing to add.
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over all active replica sets and estimate proportions for each of them.
|
// Iterate over all active replica sets and estimate proportions for each of them.
|
||||||
// The absolute value of deploymentReplicasAdded should never exceed the absolute
|
// The absolute value of deploymentReplicasAdded should never exceed the absolute
|
||||||
// value of deploymentReplicasToAdd.
|
// value of deploymentReplicasToAdd.
|
||||||
deploymentReplicasAdded := int32(0)
|
deploymentReplicasAdded := int32(0)
|
||||||
|
nameToSize := make(map[string]int32)
|
||||||
for i := range allRSs {
|
for i := range allRSs {
|
||||||
rs := allRSs[i]
|
rs := allRSs[i]
|
||||||
|
|
||||||
|
// Estimate proportions if we have replicas to add, otherwise simply populate
|
||||||
|
// nameToSize with the current sizes for each replica set.
|
||||||
|
if deploymentReplicasToAdd != 0 {
|
||||||
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
|
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
|
||||||
|
|
||||||
rs.Spec.Replicas += proportion
|
nameToSize[rs.Name] = rs.Spec.Replicas + proportion
|
||||||
deploymentReplicasAdded += proportion
|
deploymentReplicasAdded += proportion
|
||||||
|
} else {
|
||||||
|
nameToSize[rs.Name] = rs.Spec.Replicas
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update all replica sets
|
// Update all replica sets
|
||||||
@ -470,15 +474,16 @@ func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *
|
|||||||
rs := allRSs[i]
|
rs := allRSs[i]
|
||||||
|
|
||||||
// Add/remove any leftovers to the largest replica set.
|
// Add/remove any leftovers to the largest replica set.
|
||||||
if i == 0 {
|
if i == 0 && deploymentReplicasToAdd != 0 {
|
||||||
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
|
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
|
||||||
rs.Spec.Replicas += leftover
|
nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
|
||||||
if rs.Spec.Replicas < 0 {
|
if nameToSize[rs.Name] < 0 {
|
||||||
rs.Spec.Replicas = 0
|
nameToSize[rs.Name] = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil {
|
// TODO: Use transactions when we have them.
|
||||||
|
if _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
|
||||||
// Return as soon as we fail, the deployment is requeued
|
// Return as soon as we fail, the deployment is requeued
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -503,13 +508,22 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) {
|
||||||
// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
|
objCopy, err := api.Scheme.Copy(rs)
|
||||||
rs.Spec.Replicas = newScale
|
if err != nil {
|
||||||
deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment))
|
return nil, err
|
||||||
rs, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs)
|
}
|
||||||
if err == nil {
|
rsCopy := objCopy.(*extensions.ReplicaSet)
|
||||||
|
|
||||||
|
sizeNeedsUpdate := rsCopy.Spec.Replicas != newScale
|
||||||
|
annotationsNeedUpdate := deploymentutil.SetReplicasAnnotations(rsCopy, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment))
|
||||||
|
|
||||||
|
if sizeNeedsUpdate || annotationsNeedUpdate {
|
||||||
|
rsCopy.Spec.Replicas = newScale
|
||||||
|
rs, err = dc.client.Extensions().ReplicaSets(rsCopy.Namespace).Update(rsCopy)
|
||||||
|
if err == nil && sizeNeedsUpdate {
|
||||||
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %q to %d", scalingOperation, rs.Name, newScale)
|
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %q to %d", scalingOperation, rs.Name, newScale)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return rs, err
|
return rs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
testclient "k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
"k8s.io/kubernetes/pkg/controller/informers"
|
||||||
@ -56,6 +57,7 @@ func TestScale(t *testing.T) {
|
|||||||
|
|
||||||
expectedNew *extensions.ReplicaSet
|
expectedNew *extensions.ReplicaSet
|
||||||
expectedOld []*extensions.ReplicaSet
|
expectedOld []*extensions.ReplicaSet
|
||||||
|
wasntUpdated map[string]bool
|
||||||
|
|
||||||
desiredReplicasAnnotations map[string]int32
|
desiredReplicasAnnotations map[string]int32
|
||||||
}{
|
}{
|
||||||
@ -195,6 +197,7 @@ func TestScale(t *testing.T) {
|
|||||||
|
|
||||||
expectedNew: rs("foo-v3", 6, nil, newTimestamp),
|
expectedNew: rs("foo-v3", 6, nil, newTimestamp),
|
||||||
expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)},
|
expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)},
|
||||||
|
wasntUpdated: map[string]bool{"foo-v2": true, "foo-v1": true},
|
||||||
},
|
},
|
||||||
// Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 )
|
// Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 )
|
||||||
// Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to
|
// Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to
|
||||||
@ -209,6 +212,7 @@ func TestScale(t *testing.T) {
|
|||||||
|
|
||||||
expectedNew: rs("foo-v3", 2, nil, newTimestamp),
|
expectedNew: rs("foo-v3", 2, nil, newTimestamp),
|
||||||
expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
|
expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
|
||||||
|
wasntUpdated: map[string]bool{"foo-v3": true, "foo-v1": true},
|
||||||
|
|
||||||
desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)},
|
desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)},
|
||||||
},
|
},
|
||||||
@ -279,8 +283,28 @@ func TestScale(t *testing.T) {
|
|||||||
t.Errorf("%s: unexpected error: %v", test.name, err)
|
t.Errorf("%s: unexpected error: %v", test.name, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas {
|
|
||||||
t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas)
|
// Construct the nameToSize map that will hold all the sizes we got our of tests
|
||||||
|
// Skip updating the map if the replica set wasn't updated since there will be
|
||||||
|
// no update action for it.
|
||||||
|
nameToSize := make(map[string]int32)
|
||||||
|
if test.newRS != nil {
|
||||||
|
nameToSize[test.newRS.Name] = test.newRS.Spec.Replicas
|
||||||
|
}
|
||||||
|
for i := range test.oldRSs {
|
||||||
|
rs := test.oldRSs[i]
|
||||||
|
nameToSize[rs.Name] = rs.Spec.Replicas
|
||||||
|
}
|
||||||
|
// Get all the UPDATE actions and update nameToSize with all the updated sizes.
|
||||||
|
for _, action := range fake.Actions() {
|
||||||
|
rs := action.(testclient.UpdateAction).GetObject().(*extensions.ReplicaSet)
|
||||||
|
if !test.wasntUpdated[rs.Name] {
|
||||||
|
nameToSize[rs.Name] = rs.Spec.Replicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != nameToSize[test.newRS.Name] {
|
||||||
|
t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, nameToSize[test.newRS.Name])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(test.expectedOld) != len(test.oldRSs) {
|
if len(test.expectedOld) != len(test.oldRSs) {
|
||||||
@ -290,8 +314,8 @@ func TestScale(t *testing.T) {
|
|||||||
for n := range test.oldRSs {
|
for n := range test.oldRSs {
|
||||||
rs := test.oldRSs[n]
|
rs := test.oldRSs[n]
|
||||||
expected := test.expectedOld[n]
|
expected := test.expectedOld[n]
|
||||||
if expected.Spec.Replicas != rs.Spec.Replicas {
|
if expected.Spec.Replicas != nameToSize[rs.Name] {
|
||||||
t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, expected.Spec.Replicas, rs.Spec.Replicas)
|
t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, expected.Spec.Replicas, nameToSize[rs.Name])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user