mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-20 09:33:52 +00:00
Merge pull request #105377 from damemi/wire-contexts-apps
Wire contexts to Apps controllers
This commit is contained in:
@@ -72,7 +72,7 @@ type DeploymentController struct {
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
// To allow injection of syncDeployment for testing.
|
||||
syncHandler func(dKey string) error
|
||||
syncHandler func(ctx context.Context, dKey string) error
|
||||
// used for unit testing
|
||||
enqueueDeployment func(deployment *apps.Deployment)
|
||||
|
||||
@@ -146,22 +146,22 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer dc.queue.ShutDown()
|
||||
|
||||
klog.InfoS("Starting controller", "controller", "deployment")
|
||||
defer klog.InfoS("Shutting down controller", "controller", "deployment")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
|
||||
if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(dc.worker, time.Second, stopCh)
|
||||
go wait.UntilWithContext(ctx, dc.worker, time.Second)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) addDeployment(obj interface{}) {
|
||||
@@ -457,19 +457,19 @@ func (dc *DeploymentController) resolveControllerRef(namespace string, controlle
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) worker() {
|
||||
for dc.processNextWorkItem() {
|
||||
func (dc *DeploymentController) worker(ctx context.Context) {
|
||||
for dc.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) processNextWorkItem() bool {
|
||||
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
|
||||
key, quit := dc.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer dc.queue.Done(key)
|
||||
|
||||
err := dc.syncHandler(key.(string))
|
||||
err := dc.syncHandler(ctx, key.(string))
|
||||
dc.handleErr(err, key)
|
||||
|
||||
return true
|
||||
@@ -500,7 +500,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
||||
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
|
||||
// ControllerRef by adopting and orphaning.
|
||||
// It returns the list of ReplicaSets that this Deployment should manage.
|
||||
func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
|
||||
// List all ReplicaSets to find those we own but that no longer match our
|
||||
// selector. They will be orphaned by ClaimReplicaSets().
|
||||
rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
|
||||
@@ -513,8 +513,8 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment)
|
||||
}
|
||||
// If any adoptions are attempted, we should first recheck for deletion with
|
||||
// an uncached quorum read sometime after listing ReplicaSets (see #42639).
|
||||
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
|
||||
fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(context.TODO(), d.Name, metav1.GetOptions{})
|
||||
canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
|
||||
fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -524,7 +524,7 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment)
|
||||
return fresh, nil
|
||||
})
|
||||
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
|
||||
return cm.ClaimReplicaSets(rsList)
|
||||
return cm.ClaimReplicaSets(ctx, rsList)
|
||||
}
|
||||
|
||||
// getPodMapForDeployment returns the Pods managed by a Deployment.
|
||||
@@ -565,7 +565,7 @@ func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsLis
|
||||
|
||||
// syncDeployment will sync the deployment with the given key.
|
||||
// This function is not meant to be invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
|
||||
@@ -596,14 +596,14 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
|
||||
if d.Status.ObservedGeneration < d.Generation {
|
||||
d.Status.ObservedGeneration = d.Generation
|
||||
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
|
||||
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
|
||||
// through adoption/orphaning.
|
||||
rsList, err := dc.getReplicaSetsForDeployment(d)
|
||||
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -618,40 +618,40 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
}
|
||||
|
||||
if d.DeletionTimestamp != nil {
|
||||
return dc.syncStatusOnly(d, rsList)
|
||||
return dc.syncStatusOnly(ctx, d, rsList)
|
||||
}
|
||||
|
||||
// Update deployment conditions with an Unknown condition when pausing/resuming
|
||||
// a deployment. In this way, we can be sure that we won't timeout when a user
|
||||
// resumes a Deployment with a set progressDeadlineSeconds.
|
||||
if err = dc.checkPausedConditions(d); err != nil {
|
||||
if err = dc.checkPausedConditions(ctx, d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.Spec.Paused {
|
||||
return dc.sync(d, rsList)
|
||||
return dc.sync(ctx, d, rsList)
|
||||
}
|
||||
|
||||
// rollback is not re-entrant in case the underlying replica sets are updated with a new
|
||||
// revision so we should ensure that we won't proceed to update replica sets until we
|
||||
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
|
||||
if getRollbackTo(d) != nil {
|
||||
return dc.rollback(d, rsList)
|
||||
return dc.rollback(ctx, d, rsList)
|
||||
}
|
||||
|
||||
scalingEvent, err := dc.isScalingEvent(d, rsList)
|
||||
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scalingEvent {
|
||||
return dc.sync(d, rsList)
|
||||
return dc.sync(ctx, d, rsList)
|
||||
}
|
||||
|
||||
switch d.Spec.Strategy.Type {
|
||||
case apps.RecreateDeploymentStrategyType:
|
||||
return dc.rolloutRecreate(d, rsList, podMap)
|
||||
return dc.rolloutRecreate(ctx, d, rsList, podMap)
|
||||
case apps.RollingUpdateDeploymentStrategyType:
|
||||
return dc.rolloutRolling(d, rsList)
|
||||
return dc.rolloutRolling(ctx, d, rsList)
|
||||
}
|
||||
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
@@ -221,7 +222,7 @@ func (f *fixture) run_(deploymentName string, startInformers bool, expectError b
|
||||
informers.Start(stopCh)
|
||||
}
|
||||
|
||||
err = c.syncDeployment(deploymentName)
|
||||
err = c.syncDeployment(context.TODO(), deploymentName)
|
||||
if !expectError && err != nil {
|
||||
f.t.Errorf("error syncing deployment: %v", err)
|
||||
} else if expectError && err == nil {
|
||||
@@ -529,7 +530,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) {
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
rsList, err := c.getReplicaSetsForDeployment(d1)
|
||||
rsList, err := c.getReplicaSetsForDeployment(context.TODO(), d1)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
@@ -541,7 +542,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) {
|
||||
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs1.Name)
|
||||
}
|
||||
|
||||
rsList, err = c.getReplicaSetsForDeployment(d2)
|
||||
rsList, err = c.getReplicaSetsForDeployment(context.TODO(), d2)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
@@ -579,7 +580,7 @@ func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) {
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
rsList, err := c.getReplicaSetsForDeployment(d)
|
||||
rsList, err := c.getReplicaSetsForDeployment(context.TODO(), d)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
|
@@ -34,7 +34,7 @@ import (
|
||||
// cases this helper will run that cannot be prevented from the scaling detection,
|
||||
// for example a resync of the deployment after it was scaled up. In those cases,
|
||||
// we shouldn't try to estimate any progress.
|
||||
func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
|
||||
func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
// If there is no progressDeadlineSeconds set, remove any Progressing condition.
|
||||
@@ -114,7 +114,7 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, new
|
||||
|
||||
newDeployment := d
|
||||
newDeployment.Status = newStatus
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -330,7 +331,7 @@ func TestSyncRolloutStatus(t *testing.T) {
|
||||
test.allRSs = append(test.allRSs, test.newRS)
|
||||
}
|
||||
|
||||
err := dc.syncRolloutStatus(test.allRSs, test.newRS, test.d)
|
||||
err := dc.syncRolloutStatus(context.TODO(), test.allRSs, test.newRS, test.d)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -25,9 +26,9 @@ import (
|
||||
)
|
||||
|
||||
// rolloutRecreate implements the logic for recreating a replica set.
|
||||
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
|
||||
func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
|
||||
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -35,23 +36,23 @@ func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*ap
|
||||
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
|
||||
|
||||
// scale down old replica sets.
|
||||
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
|
||||
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledDown {
|
||||
// Update DeploymentStatus.
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Do not process a deployment when it has old pods running.
|
||||
if oldPodsRunning(newRS, oldRSs, podMap) {
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// If we need to create a new RS, create it now.
|
||||
if newRS == nil {
|
||||
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
|
||||
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -59,22 +60,22 @@ func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*ap
|
||||
}
|
||||
|
||||
// scale up new replica set.
|
||||
if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
|
||||
if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if util.DeploymentComplete(d, &d.Status) {
|
||||
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
|
||||
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Sync deployment status.
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate".
|
||||
func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
scaled := false
|
||||
for i := range oldRSs {
|
||||
rs := oldRSs[i]
|
||||
@@ -82,7 +83,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*app
|
||||
if *(rs.Spec.Replicas) == 0 {
|
||||
continue
|
||||
}
|
||||
scaledRS, updatedRS, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment)
|
||||
scaledRS, updatedRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, rs, 0, deployment)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -125,7 +126,7 @@ func oldPodsRunning(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet, podMap ma
|
||||
}
|
||||
|
||||
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
|
||||
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
|
||||
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(ctx context.Context, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
|
||||
return scaled, err
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
@@ -71,7 +72,7 @@ func TestScaleDownOldReplicaSets(t *testing.T) {
|
||||
}
|
||||
c.eventRecorder = &record.FakeRecorder{}
|
||||
|
||||
c.scaleDownOldReplicaSetsForRecreate(oldRSs, test.d)
|
||||
c.scaleDownOldReplicaSetsForRecreate(context.TODO(), oldRSs, test.d)
|
||||
for j := range oldRSs {
|
||||
rs := oldRSs[j]
|
||||
|
||||
|
@@ -31,8 +31,8 @@ import (
|
||||
)
|
||||
|
||||
// rollback the deployment to the specified revision. In any case cleanup the rollback spec.
|
||||
func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
|
||||
func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -45,7 +45,7 @@ func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.Repl
|
||||
// If we still can't find the last revision, gives up rollback
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
|
||||
// Gives up rollback
|
||||
return dc.updateDeploymentAndClearRollbackTo(d)
|
||||
return dc.updateDeploymentAndClearRollbackTo(ctx, d)
|
||||
}
|
||||
}
|
||||
for _, rs := range allRSs {
|
||||
@@ -59,7 +59,7 @@ func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.Repl
|
||||
// rollback by copying podTemplate.Spec from the replica set
|
||||
// revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call
|
||||
// no-op if the spec matches current deployment's podTemplate.Spec
|
||||
performedRollback, err := dc.rollbackToTemplate(d, rs)
|
||||
performedRollback, err := dc.rollbackToTemplate(ctx, d, rs)
|
||||
if performedRollback && err == nil {
|
||||
dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision))
|
||||
}
|
||||
@@ -68,13 +68,13 @@ func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.Repl
|
||||
}
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
|
||||
// Gives up rollback
|
||||
return dc.updateDeploymentAndClearRollbackTo(d)
|
||||
return dc.updateDeploymentAndClearRollbackTo(ctx, d)
|
||||
}
|
||||
|
||||
// rollbackToTemplate compares the templates of the provided deployment and replica set and
|
||||
// updates the deployment with the replica set template in case they are different. It also
|
||||
// cleans up the rollback spec so subsequent requeues of the deployment won't end up in here.
|
||||
func (dc *DeploymentController) rollbackToTemplate(d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) {
|
||||
func (dc *DeploymentController) rollbackToTemplate(ctx context.Context, d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) {
|
||||
performedRollback := false
|
||||
if !deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
|
||||
klog.V(4).Infof("Rolling back deployment %q to template spec %+v", d.Name, rs.Spec.Template.Spec)
|
||||
@@ -98,7 +98,7 @@ func (dc *DeploymentController) rollbackToTemplate(d *apps.Deployment, rs *apps.
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackTemplateUnchanged, eventMsg)
|
||||
}
|
||||
|
||||
return performedRollback, dc.updateDeploymentAndClearRollbackTo(d)
|
||||
return performedRollback, dc.updateDeploymentAndClearRollbackTo(ctx, d)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) emitRollbackWarningEvent(d *apps.Deployment, reason, message string) {
|
||||
@@ -112,10 +112,10 @@ func (dc *DeploymentController) emitRollbackNormalEvent(d *apps.Deployment, mess
|
||||
// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment
|
||||
// It is assumed that the caller will have updated the deployment template appropriately (in case
|
||||
// we want to rollback).
|
||||
func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(d *apps.Deployment) error {
|
||||
func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(ctx context.Context, d *apps.Deployment) error {
|
||||
klog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name)
|
||||
setRollbackTo(d, nil)
|
||||
_, err := dc.client.AppsV1().Deployments(d.Namespace).Update(context.TODO(), d, metav1.UpdateOptions{})
|
||||
_, err := dc.client.AppsV1().Deployments(d.Namespace).Update(ctx, d, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
@@ -28,62 +29,62 @@ import (
|
||||
)
|
||||
|
||||
// rolloutRolling implements the logic for rolling a new replica set.
|
||||
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
|
||||
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allRSs := append(oldRSs, newRS)
|
||||
|
||||
// Scale up, if we can.
|
||||
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
|
||||
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledUp {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Scale down, if we can.
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledDown {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
if deploymentutil.DeploymentComplete(d, &d.Status) {
|
||||
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
|
||||
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Sync deployment status
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
|
||||
// Scaling not required.
|
||||
return false, nil
|
||||
}
|
||||
if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
|
||||
// Scale down.
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
|
||||
return scaled, err
|
||||
}
|
||||
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
|
||||
return scaled, err
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
|
||||
if oldPodsCount == 0 {
|
||||
// Can't scale down further
|
||||
@@ -133,7 +134,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSe
|
||||
|
||||
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
|
||||
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
|
||||
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
|
||||
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
@@ -141,7 +142,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSe
|
||||
|
||||
// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
|
||||
allRSs = append(oldRSs, newRS)
|
||||
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
|
||||
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
@@ -152,7 +153,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSe
|
||||
}
|
||||
|
||||
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
|
||||
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
|
||||
func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
|
||||
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
|
||||
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
|
||||
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
|
||||
@@ -177,7 +178,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*apps.ReplicaS
|
||||
if newReplicasCount > *(targetRS.Spec.Replicas) {
|
||||
return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
|
||||
}
|
||||
_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
|
||||
_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
|
||||
if err != nil {
|
||||
return nil, totalScaledDown, err
|
||||
}
|
||||
@@ -189,7 +190,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*apps.ReplicaS
|
||||
|
||||
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
|
||||
// Need check maxUnavailable to ensure availability
|
||||
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
|
||||
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
|
||||
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
|
||||
|
||||
// Check if we can scale down.
|
||||
@@ -221,7 +222,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [
|
||||
if newReplicasCount > *(targetRS.Spec.Replicas) {
|
||||
return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
|
||||
}
|
||||
_, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
|
||||
_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
|
||||
if err != nil {
|
||||
return totalScaledDown, err
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
@@ -90,7 +91,7 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
|
||||
client: &fake,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
}
|
||||
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment)
|
||||
scaled, err := controller.reconcileNewReplicaSet(context.TODO(), allRSs, newRS, deployment)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
continue
|
||||
@@ -197,7 +198,7 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
}
|
||||
|
||||
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment)
|
||||
scaled, err := controller.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, newRS, deployment)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
continue
|
||||
@@ -265,7 +266,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
|
||||
client: &fakeClientset,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
}
|
||||
_, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, int32(test.maxCleanupCount))
|
||||
_, cleanupCount, err := controller.cleanupUnhealthyReplicas(context.TODO(), oldRSs, deployment, int32(test.maxCleanupCount))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
continue
|
||||
@@ -339,7 +340,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
|
||||
client: &fakeClientset,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
}
|
||||
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
|
||||
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(context.TODO(), allRSs, oldRSs, deployment)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
continue
|
||||
|
@@ -34,24 +34,24 @@ import (
|
||||
)
|
||||
|
||||
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
|
||||
func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
|
||||
func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
return dc.syncDeploymentStatus(allRSs, newRS, d)
|
||||
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// sync is responsible for reconciling deployments on scaling events or when they
|
||||
// are paused.
|
||||
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
|
||||
func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dc.scale(d, newRS, oldRSs); err != nil {
|
||||
if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
|
||||
// If we get an error while trying to scale, the deployment will be requeued
|
||||
// so we can abort this resync
|
||||
return err
|
||||
@@ -59,19 +59,19 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS
|
||||
|
||||
// Clean up the deployment when it's paused and no rollback is in flight.
|
||||
if d.Spec.Paused && getRollbackTo(d) == nil {
|
||||
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
|
||||
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
return dc.syncDeploymentStatus(allRSs, newRS, d)
|
||||
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
|
||||
// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
|
||||
// that were paused for longer than progressDeadlineSeconds.
|
||||
func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
|
||||
func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error {
|
||||
if !deploymentutil.HasProgressDeadline(d) {
|
||||
return nil
|
||||
}
|
||||
@@ -98,7 +98,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error
|
||||
}
|
||||
|
||||
var err error
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -113,11 +113,11 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error
|
||||
//
|
||||
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
|
||||
// This may lead to stale reads of replica sets, thus incorrect deployment status.
|
||||
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
|
||||
_, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)
|
||||
|
||||
// Get new replica set with the updated revision number
|
||||
newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted)
|
||||
newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -135,7 +135,7 @@ const (
|
||||
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
|
||||
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
|
||||
// Note that the pod-template-hash will be added to adopted RSes and pods.
|
||||
func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
|
||||
existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)
|
||||
|
||||
// Calculate the max revision number among all old RSes
|
||||
@@ -155,7 +155,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
|
||||
if annotationsUpdated || minReadySecondsNeedsUpdate {
|
||||
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
|
||||
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
|
||||
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
// Should use the revision in existingNewRS's annotation, since it set by before
|
||||
@@ -173,7 +173,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
|
||||
if needsUpdate {
|
||||
var err error
|
||||
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil {
|
||||
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -220,7 +220,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
// hash collisions. If there is any other error, we need to report it in the status of
|
||||
// the Deployment.
|
||||
alreadyExists := false
|
||||
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
|
||||
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
|
||||
switch {
|
||||
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
|
||||
case errors.IsAlreadyExists(err):
|
||||
@@ -252,7 +252,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
*d.Status.CollisionCount++
|
||||
// Update the collisionCount for the Deployment and let it requeue by returning the original
|
||||
// error.
|
||||
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
|
||||
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
if dErr == nil {
|
||||
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
|
||||
}
|
||||
@@ -268,7 +268,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
// We don't really care about this error at this point, since we have a bigger issue to report.
|
||||
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
|
||||
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
|
||||
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
|
||||
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
}
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
|
||||
return nil, err
|
||||
@@ -285,7 +285,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
needsUpdate = true
|
||||
}
|
||||
if needsUpdate {
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
}
|
||||
return createdRS, err
|
||||
}
|
||||
@@ -295,14 +295,14 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
|
||||
// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
|
||||
// replicas in the event of a problem with the rolled out template. Should run only on scaling events or
|
||||
// when a deployment is paused and not during the normal rollout process.
|
||||
func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
|
||||
func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
|
||||
// If there is only one active replica set then we should scale that up to the full count of the
|
||||
// deployment. If there is no active replica set, then we should scale up the newest replica set.
|
||||
if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
|
||||
if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
|
||||
return nil
|
||||
}
|
||||
_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
|
||||
_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -310,7 +310,7 @@ func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.R
|
||||
// This case handles replica set adoption during a saturated new replica set.
|
||||
if deploymentutil.IsSaturated(deployment, newRS) {
|
||||
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
|
||||
if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
|
||||
if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -384,7 +384,7 @@ func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.R
|
||||
}
|
||||
|
||||
// TODO: Use transactions when we have them.
|
||||
if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
|
||||
if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
|
||||
// Return as soon as we fail, the deployment is requeued
|
||||
return err
|
||||
}
|
||||
@@ -393,7 +393,7 @@ func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.R
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
|
||||
// No need to scale
|
||||
if *(rs.Spec.Replicas) == newScale {
|
||||
return false, rs, nil
|
||||
@@ -404,11 +404,11 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe
|
||||
} else {
|
||||
scalingOperation = "down"
|
||||
}
|
||||
scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
|
||||
scaled, newRS, err := dc.scaleReplicaSet(ctx, rs, newScale, deployment, scalingOperation)
|
||||
return scaled, newRS, err
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
|
||||
|
||||
sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
|
||||
|
||||
@@ -420,7 +420,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
|
||||
rsCopy := rs.DeepCopy()
|
||||
*(rsCopy.Spec.Replicas) = newScale
|
||||
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
|
||||
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
|
||||
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
|
||||
if err == nil && sizeNeedsUpdate {
|
||||
scaled = true
|
||||
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
|
||||
@@ -432,7 +432,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
|
||||
// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
|
||||
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
|
||||
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
|
||||
func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
|
||||
func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
|
||||
if !deploymentutil.HasRevisionHistoryLimit(deployment) {
|
||||
return nil
|
||||
}
|
||||
@@ -458,7 +458,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
|
||||
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
|
||||
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
|
||||
// Return error instead of aggregating and continuing DELETEs on the theory
|
||||
// that we may be overloading the api server.
|
||||
return err
|
||||
@@ -469,7 +469,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep
|
||||
}
|
||||
|
||||
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
|
||||
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
|
||||
func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
if reflect.DeepEqual(d.Status, newStatus) {
|
||||
@@ -478,7 +478,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet,
|
||||
|
||||
newDeployment := d
|
||||
newDeployment.Status = newStatus
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -525,8 +525,8 @@ func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployme
|
||||
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
|
||||
//
|
||||
// rsList should come from getReplicaSetsForDeployment(d).
|
||||
func (dc *DeploymentController) isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
|
||||
func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -297,7 +298,7 @@ func TestScale(t *testing.T) {
|
||||
deploymentutil.SetReplicasAnnotations(rs, desiredReplicas, desiredReplicas+deploymentutil.MaxSurge(*test.oldDeployment))
|
||||
}
|
||||
|
||||
if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil {
|
||||
if err := dc.scale(context.TODO(), test.deployment, test.newRS, test.oldRSs); err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.name, err)
|
||||
return
|
||||
}
|
||||
@@ -433,7 +434,7 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) {
|
||||
|
||||
t.Logf(" &test.revisionHistoryLimit: %d", test.revisionHistoryLimit)
|
||||
d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"})
|
||||
controller.cleanupDeployment(test.oldRSs, d)
|
||||
controller.cleanupDeployment(context.TODO(), test.oldRSs, d)
|
||||
|
||||
gotDeletions := 0
|
||||
for _, action := range fake.Actions() {
|
||||
@@ -565,7 +566,7 @@ func TestDeploymentController_cleanupDeploymentOrder(t *testing.T) {
|
||||
informers.Start(stopCh)
|
||||
|
||||
d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"})
|
||||
controller.cleanupDeployment(test.oldRSs, d)
|
||||
controller.cleanupDeployment(context.TODO(), test.oldRSs, d)
|
||||
|
||||
deletedRSs := sets.String{}
|
||||
for _, action := range fake.Actions() {
|
||||
|
Reference in New Issue
Block a user