mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-25 01:20:18 +00:00 
			
		
		
		
	This change makes us use the generic workqueue throughout the project in order to improve type safety and readability of the code.
		
			
				
	
	
		
			681 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			681 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| // Package deployment contains all the logic for handling Kubernetes Deployments.
 | |
| // It implements a set of strategies (rolling, recreate) for deploying an application,
 | |
| // the means to rollback to previous versions, proportional scaling for mitigating
 | |
| // risk, cleanup policy, and other useful features of Deployments.
 | |
| package deployment
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"reflect"
 | |
| 	"time"
 | |
| 
 | |
| 	apps "k8s.io/api/apps/v1"
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	"k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	appsinformers "k8s.io/client-go/informers/apps/v1"
 | |
| 	coreinformers "k8s.io/client-go/informers/core/v1"
 | |
| 	clientset "k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/kubernetes/scheme"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	appslisters "k8s.io/client-go/listers/apps/v1"
 | |
| 	corelisters "k8s.io/client-go/listers/core/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	"k8s.io/client-go/util/workqueue"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| 	"k8s.io/kubernetes/pkg/controller/deployment/util"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
 | |
| 	// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
 | |
| 	// a deployment is going to be requeued:
 | |
| 	//
 | |
| 	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
 | |
| 	maxRetries = 15
 | |
| )
 | |
| 
 | |
| // controllerKind contains the schema.GroupVersionKind for this controller type.
 | |
| var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
 | |
| 
 | |
| // DeploymentController is responsible for synchronizing Deployment objects stored
 | |
| // in the system with actual running replica sets and pods.
 | |
| type DeploymentController struct {
 | |
| 	// rsControl is used for adopting/releasing replica sets.
 | |
| 	rsControl controller.RSControlInterface
 | |
| 	client    clientset.Interface
 | |
| 
 | |
| 	eventBroadcaster record.EventBroadcaster
 | |
| 	eventRecorder    record.EventRecorder
 | |
| 
 | |
| 	// To allow injection of syncDeployment for testing.
 | |
| 	syncHandler func(ctx context.Context, dKey string) error
 | |
| 	// used for unit testing
 | |
| 	enqueueDeployment func(deployment *apps.Deployment)
 | |
| 
 | |
| 	// dLister can list/get deployments from the shared informer's store
 | |
| 	dLister appslisters.DeploymentLister
 | |
| 	// rsLister can list/get replica sets from the shared informer's store
 | |
| 	rsLister appslisters.ReplicaSetLister
 | |
| 	// podLister can list/get pods from the shared informer's store
 | |
| 	podLister corelisters.PodLister
 | |
| 
 | |
| 	// dListerSynced returns true if the Deployment store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	dListerSynced cache.InformerSynced
 | |
| 	// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	rsListerSynced cache.InformerSynced
 | |
| 	// podListerSynced returns true if the pod store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	podListerSynced cache.InformerSynced
 | |
| 
 | |
| 	// Deployments that need to be synced
 | |
| 	queue workqueue.TypedRateLimitingInterface[string]
 | |
| }
 | |
| 
 | |
| // NewDeploymentController creates a new DeploymentController.
 | |
| func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
 | |
| 	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	dc := &DeploymentController{
 | |
| 		client:           client,
 | |
| 		eventBroadcaster: eventBroadcaster,
 | |
| 		eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
 | |
| 		queue: workqueue.NewTypedRateLimitingQueueWithConfig(
 | |
| 			workqueue.DefaultTypedControllerRateLimiter[string](),
 | |
| 			workqueue.TypedRateLimitingQueueConfig[string]{
 | |
| 				Name: "deployment",
 | |
| 			},
 | |
| 		),
 | |
| 	}
 | |
| 	dc.rsControl = controller.RealRSControl{
 | |
| 		KubeClient: client,
 | |
| 		Recorder:   dc.eventRecorder,
 | |
| 	}
 | |
| 
 | |
| 	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc: func(obj interface{}) {
 | |
| 			dc.addDeployment(logger, obj)
 | |
| 		},
 | |
| 		UpdateFunc: func(oldObj, newObj interface{}) {
 | |
| 			dc.updateDeployment(logger, oldObj, newObj)
 | |
| 		},
 | |
| 		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
 | |
| 		DeleteFunc: func(obj interface{}) {
 | |
| 			dc.deleteDeployment(logger, obj)
 | |
| 		},
 | |
| 	})
 | |
| 	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc: func(obj interface{}) {
 | |
| 			dc.addReplicaSet(logger, obj)
 | |
| 		},
 | |
| 		UpdateFunc: func(oldObj, newObj interface{}) {
 | |
| 			dc.updateReplicaSet(logger, oldObj, newObj)
 | |
| 		},
 | |
| 		DeleteFunc: func(obj interface{}) {
 | |
| 			dc.deleteReplicaSet(logger, obj)
 | |
| 		},
 | |
| 	})
 | |
| 	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		DeleteFunc: func(obj interface{}) {
 | |
| 			dc.deletePod(logger, obj)
 | |
| 		},
 | |
| 	})
 | |
| 
 | |
| 	dc.syncHandler = dc.syncDeployment
 | |
| 	dc.enqueueDeployment = dc.enqueue
 | |
| 
 | |
| 	dc.dLister = dInformer.Lister()
 | |
| 	dc.rsLister = rsInformer.Lister()
 | |
| 	dc.podLister = podInformer.Lister()
 | |
| 	dc.dListerSynced = dInformer.Informer().HasSynced
 | |
| 	dc.rsListerSynced = rsInformer.Informer().HasSynced
 | |
| 	dc.podListerSynced = podInformer.Informer().HasSynced
 | |
| 	return dc, nil
 | |
| }
 | |
| 
 | |
| // Run begins watching and syncing.
 | |
| func (dc *DeploymentController) Run(ctx context.Context, workers int) {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 
 | |
| 	// Start events processing pipeline.
 | |
| 	dc.eventBroadcaster.StartStructuredLogging(3)
 | |
| 	dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
 | |
| 	defer dc.eventBroadcaster.Shutdown()
 | |
| 
 | |
| 	defer dc.queue.ShutDown()
 | |
| 
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	logger.Info("Starting controller", "controller", "deployment")
 | |
| 	defer logger.Info("Shutting down controller", "controller", "deployment")
 | |
| 
 | |
| 	if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < workers; i++ {
 | |
| 		go wait.UntilWithContext(ctx, dc.worker, time.Second)
 | |
| 	}
 | |
| 
 | |
| 	<-ctx.Done()
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
 | |
| 	d := obj.(*apps.Deployment)
 | |
| 	logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
 | |
| 	dc.enqueueDeployment(d)
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
 | |
| 	oldD := old.(*apps.Deployment)
 | |
| 	curD := cur.(*apps.Deployment)
 | |
| 	logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
 | |
| 	dc.enqueueDeployment(curD)
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
 | |
| 	d, ok := obj.(*apps.Deployment)
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 		d, ok = tombstone.Obj.(*apps.Deployment)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
 | |
| 	dc.enqueueDeployment(d)
 | |
| }
 | |
| 
 | |
| // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
 | |
| func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) {
 | |
| 	rs := obj.(*apps.ReplicaSet)
 | |
| 
 | |
| 	if rs.DeletionTimestamp != nil {
 | |
| 		// On a restart of the controller manager, it's possible for an object to
 | |
| 		// show up in a state that is already pending deletion.
 | |
| 		dc.deleteReplicaSet(logger, rs)
 | |
| 		return
 | |
| 	}
 | |
| 	// If it has a ControllerRef, that's all that matters.
 | |
| 	if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
 | |
| 		d := dc.resolveControllerRef(rs.Namespace, controllerRef)
 | |
| 		if d == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs))
 | |
| 		dc.enqueueDeployment(d)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
 | |
| 	// them to see if anyone wants to adopt it.
 | |
| 	ds := dc.getDeploymentsForReplicaSet(logger, rs)
 | |
| 	if len(ds) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
 | |
| 	for _, d := range ds {
 | |
| 		dc.enqueueDeployment(d)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // getDeploymentsForReplicaSet returns a list of Deployments that potentially
 | |
| // match a ReplicaSet.
 | |
| func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment {
 | |
| 	deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
 | |
| 	if err != nil || len(deployments) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	// Because all ReplicaSet's belonging to a deployment should have a unique label key,
 | |
| 	// there should never be more than one deployment returned by the above method.
 | |
| 	// If that happens we should probably dynamically repair the situation by ultimately
 | |
| 	// trying to clean up one of the controllers, for now we just return the older one
 | |
| 	if len(deployments) > 1 {
 | |
| 		// ControllerRef will ensure we don't do anything crazy, but more than one
 | |
| 		// item in this list nevertheless constitutes user error.
 | |
| 		logger.V(4).Info("user error! more than one deployment is selecting replica set",
 | |
| 			"replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
 | |
| 	}
 | |
| 	return deployments
 | |
| }
 | |
| 
 | |
| // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
 | |
| // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
 | |
| // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet
 | |
| // types.
 | |
| func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) {
 | |
| 	curRS := cur.(*apps.ReplicaSet)
 | |
| 	oldRS := old.(*apps.ReplicaSet)
 | |
| 	if curRS.ResourceVersion == oldRS.ResourceVersion {
 | |
| 		// Periodic resync will send update events for all known replica sets.
 | |
| 		// Two different versions of the same replica set will always have different RVs.
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	curControllerRef := metav1.GetControllerOf(curRS)
 | |
| 	oldControllerRef := metav1.GetControllerOf(oldRS)
 | |
| 	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
 | |
| 	if controllerRefChanged && oldControllerRef != nil {
 | |
| 		// The ControllerRef was changed. Sync the old controller, if any.
 | |
| 		if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
 | |
| 			dc.enqueueDeployment(d)
 | |
| 		}
 | |
| 	}
 | |
| 	// If it has a ControllerRef, that's all that matters.
 | |
| 	if curControllerRef != nil {
 | |
| 		d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
 | |
| 		if d == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
 | |
| 		dc.enqueueDeployment(d)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Otherwise, it's an orphan. If anything changed, sync matching controllers
 | |
| 	// to see if anyone wants to adopt it now.
 | |
| 	labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
 | |
| 	if labelChanged || controllerRefChanged {
 | |
| 		ds := dc.getDeploymentsForReplicaSet(logger, curRS)
 | |
| 		if len(ds) == 0 {
 | |
| 			return
 | |
| 		}
 | |
| 		logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
 | |
| 		for _, d := range ds {
 | |
| 			dc.enqueueDeployment(d)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
 | |
| // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or
 | |
| // a DeletionFinalStateUnknown marker item.
 | |
| func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) {
 | |
| 	rs, ok := obj.(*apps.ReplicaSet)
 | |
| 
 | |
| 	// When a delete is dropped, the relist will notice a pod in the store not
 | |
| 	// in the list, leading to the insertion of a tombstone object which contains
 | |
| 	// the deleted key/value. Note that this value might be stale. If the ReplicaSet
 | |
| 	// changed labels the new deployment will not be woken up till the periodic resync.
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 		rs, ok = tombstone.Obj.(*apps.ReplicaSet)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	controllerRef := metav1.GetControllerOf(rs)
 | |
| 	if controllerRef == nil {
 | |
| 		// No controller should care about orphans being deleted.
 | |
| 		return
 | |
| 	}
 | |
| 	d := dc.resolveControllerRef(rs.Namespace, controllerRef)
 | |
| 	if d == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
 | |
| 	dc.enqueueDeployment(d)
 | |
| }
 | |
| 
 | |
| // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
 | |
| func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) {
 | |
| 	pod, ok := obj.(*v1.Pod)
 | |
| 
 | |
| 	// When a delete is dropped, the relist will notice a pod in the store not
 | |
| 	// in the list, leading to the insertion of a tombstone object which contains
 | |
| 	// the deleted key/value. Note that this value might be stale. If the Pod
 | |
| 	// changed labels the new deployment will not be woken up till the periodic resync.
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 		pod, ok = tombstone.Obj.(*v1.Pod)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	d := dc.getDeploymentForPod(logger, pod)
 | |
| 	if d == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
 | |
| 	if d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
 | |
| 		// Sync if this Deployment now has no more Pods.
 | |
| 		rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		podMap, err := dc.getPodMapForDeployment(d, rsList)
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		numPods := 0
 | |
| 		for _, podList := range podMap {
 | |
| 			numPods += len(podList)
 | |
| 		}
 | |
| 		if numPods == 0 {
 | |
| 			dc.enqueueDeployment(d)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
 | |
| 	key, err := controller.KeyFunc(deployment)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dc.queue.Add(key)
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
 | |
| 	key, err := controller.KeyFunc(deployment)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dc.queue.AddRateLimited(key)
 | |
| }
 | |
| 
 | |
| // enqueueAfter will enqueue a deployment after the provided amount of time.
 | |
| func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
 | |
| 	key, err := controller.KeyFunc(deployment)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dc.queue.AddAfter(key, after)
 | |
| }
 | |
| 
 | |
| // getDeploymentForPod returns the deployment managing the given Pod.
 | |
| func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment {
 | |
| 	// Find the owning replica set
 | |
| 	var rs *apps.ReplicaSet
 | |
| 	var err error
 | |
| 	controllerRef := metav1.GetControllerOf(pod)
 | |
| 	if controllerRef == nil {
 | |
| 		// No controller owns this Pod.
 | |
| 		return nil
 | |
| 	}
 | |
| 	if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
 | |
| 		// Not a pod owned by a replica set.
 | |
| 		return nil
 | |
| 	}
 | |
| 	rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
 | |
| 	if err != nil || rs.UID != controllerRef.UID {
 | |
| 		logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Now find the Deployment that owns that ReplicaSet.
 | |
| 	controllerRef = metav1.GetControllerOf(rs)
 | |
| 	if controllerRef == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return dc.resolveControllerRef(rs.Namespace, controllerRef)
 | |
| }
 | |
| 
 | |
| // resolveControllerRef returns the controller referenced by a ControllerRef,
 | |
| // or nil if the ControllerRef could not be resolved to a matching controller
 | |
| // of the correct Kind.
 | |
| func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
 | |
| 	// We can't look up by UID, so look up by Name and then verify UID.
 | |
| 	// Don't even try to look up by Name if it's the wrong Kind.
 | |
| 	if controllerRef.Kind != controllerKind.Kind {
 | |
| 		return nil
 | |
| 	}
 | |
| 	d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	if d.UID != controllerRef.UID {
 | |
| 		// The controller we found with this Name is not the same one that the
 | |
| 		// ControllerRef points to.
 | |
| 		return nil
 | |
| 	}
 | |
| 	return d
 | |
| }
 | |
| 
 | |
| // worker runs a worker thread that just dequeues items, processes them, and marks them done.
 | |
| // It enforces that the syncHandler is never invoked concurrently with the same key.
 | |
| func (dc *DeploymentController) worker(ctx context.Context) {
 | |
| 	for dc.processNextWorkItem(ctx) {
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
 | |
| 	key, quit := dc.queue.Get()
 | |
| 	if quit {
 | |
| 		return false
 | |
| 	}
 | |
| 	defer dc.queue.Done(key)
 | |
| 
 | |
| 	err := dc.syncHandler(ctx, key)
 | |
| 	dc.handleErr(ctx, err, key)
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) handleErr(ctx context.Context, err error, key string) {
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
 | |
| 		dc.queue.Forget(key)
 | |
| 		return
 | |
| 	}
 | |
| 	ns, name, keyErr := cache.SplitMetaNamespaceKey(key)
 | |
| 	if keyErr != nil {
 | |
| 		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
 | |
| 	}
 | |
| 
 | |
| 	if dc.queue.NumRequeues(key) < maxRetries {
 | |
| 		logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
 | |
| 		dc.queue.AddRateLimited(key)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	utilruntime.HandleError(err)
 | |
| 	logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
 | |
| 	dc.queue.Forget(key)
 | |
| }
 | |
| 
 | |
| // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
 | |
| // ControllerRef by adopting and orphaning.
 | |
| // It returns the list of ReplicaSets that this Deployment should manage.
 | |
| func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
 | |
| 	// List all ReplicaSets to find those we own but that no longer match our
 | |
| 	// selector. They will be orphaned by ClaimReplicaSets().
 | |
| 	rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
 | |
| 	}
 | |
| 	// If any adoptions are attempted, we should first recheck for deletion with
 | |
| 	// an uncached quorum read sometime after listing ReplicaSets (see #42639).
 | |
| 	canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
 | |
| 		fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if fresh.UID != d.UID {
 | |
| 			return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
 | |
| 		}
 | |
| 		return fresh, nil
 | |
| 	})
 | |
| 	cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
 | |
| 	return cm.ClaimReplicaSets(ctx, rsList)
 | |
| }
 | |
| 
 | |
| // getPodMapForDeployment returns the Pods managed by a Deployment.
 | |
| //
 | |
| // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
 | |
| // according to the Pod's ControllerRef.
 | |
| // NOTE: The pod pointers returned by this method point the pod objects in the cache and thus
 | |
| // shouldn't be modified in any way.
 | |
| func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
 | |
| 	// Get all Pods that potentially belong to this Deployment.
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	pods, err := dc.podLister.Pods(d.Namespace).List(selector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// Group Pods by their controller (if it's in rsList).
 | |
| 	podMap := make(map[types.UID][]*v1.Pod, len(rsList))
 | |
| 	for _, rs := range rsList {
 | |
| 		podMap[rs.UID] = []*v1.Pod{}
 | |
| 	}
 | |
| 	for _, pod := range pods {
 | |
| 		// Do not ignore inactive Pods because Recreate Deployments need to verify that no
 | |
| 		// Pods from older versions are running before spinning up new Pods.
 | |
| 		controllerRef := metav1.GetControllerOf(pod)
 | |
| 		if controllerRef == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		// Only append if we care about this UID.
 | |
| 		if _, ok := podMap[controllerRef.UID]; ok {
 | |
| 			podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
 | |
| 		}
 | |
| 	}
 | |
| 	return podMap, nil
 | |
| }
 | |
| 
 | |
| // syncDeployment will sync the deployment with the given key.
 | |
| // This function is not meant to be invoked concurrently with the same key.
 | |
| func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 | |
| 	if err != nil {
 | |
| 		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	startTime := time.Now()
 | |
| 	logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
 | |
| 	defer func() {
 | |
| 		logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
 | |
| 	}()
 | |
| 
 | |
| 	deployment, err := dc.dLister.Deployments(namespace).Get(name)
 | |
| 	if errors.IsNotFound(err) {
 | |
| 		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
 | |
| 		return nil
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Deep-copy otherwise we are mutating our cache.
 | |
| 	// TODO: Deep-copy only when needed.
 | |
| 	d := deployment.DeepCopy()
 | |
| 
 | |
| 	everything := metav1.LabelSelector{}
 | |
| 	if reflect.DeepEqual(d.Spec.Selector, &everything) {
 | |
| 		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
 | |
| 		if d.Status.ObservedGeneration < d.Generation {
 | |
| 			d.Status.ObservedGeneration = d.Generation
 | |
| 			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
 | |
| 	// through adoption/orphaning.
 | |
| 	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
 | |
| 	// Current uses of the podMap are:
 | |
| 	//
 | |
| 	// * check if a Pod is labeled correctly with the pod-template-hash label.
 | |
| 	// * check that no old Pods are running in the middle of Recreate Deployments.
 | |
| 	podMap, err := dc.getPodMapForDeployment(d, rsList)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if d.DeletionTimestamp != nil {
 | |
| 		return dc.syncStatusOnly(ctx, d, rsList)
 | |
| 	}
 | |
| 
 | |
| 	// Update deployment conditions with an Unknown condition when pausing/resuming
 | |
| 	// a deployment. In this way, we can be sure that we won't timeout when a user
 | |
| 	// resumes a Deployment with a set progressDeadlineSeconds.
 | |
| 	if err = dc.checkPausedConditions(ctx, d); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if d.Spec.Paused {
 | |
| 		return dc.sync(ctx, d, rsList)
 | |
| 	}
 | |
| 
 | |
| 	// rollback is not re-entrant in case the underlying replica sets are updated with a new
 | |
| 	// revision so we should ensure that we won't proceed to update replica sets until we
 | |
| 	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
 | |
| 	if getRollbackTo(d) != nil {
 | |
| 		return dc.rollback(ctx, d, rsList)
 | |
| 	}
 | |
| 
 | |
| 	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if scalingEvent {
 | |
| 		return dc.sync(ctx, d, rsList)
 | |
| 	}
 | |
| 
 | |
| 	switch d.Spec.Strategy.Type {
 | |
| 	case apps.RecreateDeploymentStrategyType:
 | |
| 		return dc.rolloutRecreate(ctx, d, rsList, podMap)
 | |
| 	case apps.RollingUpdateDeploymentStrategyType:
 | |
| 		return dc.rolloutRolling(ctx, d, rsList)
 | |
| 	}
 | |
| 	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
 | |
| }
 |