mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #36648 from kargakis/follow-up-to-perma-failed
Automatic merge from submit-queue (batch tested with PRs 38377, 36365, 36648, 37691, 38339) controller: sync stuck deployments in a secondary queue @kubernetes/deployment this makes Deployments not depend on a tight resync interval in order to estimate progress.
This commit is contained in:
commit
3519ba4099
@ -85,6 +85,8 @@ type DeploymentController struct {
|
|||||||
|
|
||||||
// Deployments that need to be synced
|
// Deployments that need to be synced
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
|
// Deployments that need to be checked for progress.
|
||||||
|
progressQueue workqueue.RateLimitingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDeploymentController creates a new DeploymentController.
|
// NewDeploymentController creates a new DeploymentController.
|
||||||
@ -101,13 +103,14 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer
|
|||||||
client: client,
|
client: client,
|
||||||
eventRecorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "deployment-controller"}),
|
eventRecorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "deployment-controller"}),
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
||||||
|
progressQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "progress-check"),
|
||||||
}
|
}
|
||||||
|
|
||||||
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: dc.addDeploymentNotification,
|
AddFunc: dc.addDeployment,
|
||||||
UpdateFunc: dc.updateDeploymentNotification,
|
UpdateFunc: dc.updateDeployment,
|
||||||
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
||||||
DeleteFunc: dc.deleteDeploymentNotification,
|
DeleteFunc: dc.deleteDeployment,
|
||||||
})
|
})
|
||||||
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: dc.addReplicaSet,
|
AddFunc: dc.addReplicaSet,
|
||||||
@ -129,6 +132,7 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer
|
|||||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer dc.queue.ShutDown()
|
defer dc.queue.ShutDown()
|
||||||
|
defer dc.progressQueue.ShutDown()
|
||||||
|
|
||||||
glog.Infof("Starting deployment controller")
|
glog.Infof("Starting deployment controller")
|
||||||
|
|
||||||
@ -139,25 +143,26 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
|||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go wait.Until(dc.worker, time.Second, stopCh)
|
go wait.Until(dc.worker, time.Second, stopCh)
|
||||||
}
|
}
|
||||||
|
go wait.Until(dc.progressWorker, time.Second, stopCh)
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
glog.Infof("Shutting down deployment controller")
|
glog.Infof("Shutting down deployment controller")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
|
func (dc *DeploymentController) addDeployment(obj interface{}) {
|
||||||
d := obj.(*extensions.Deployment)
|
d := obj.(*extensions.Deployment)
|
||||||
glog.V(4).Infof("Adding deployment %s", d.Name)
|
glog.V(4).Infof("Adding deployment %s", d.Name)
|
||||||
dc.enqueueDeployment(d)
|
dc.enqueueDeployment(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) updateDeploymentNotification(old, cur interface{}) {
|
func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
|
||||||
oldD := old.(*extensions.Deployment)
|
oldD := old.(*extensions.Deployment)
|
||||||
glog.V(4).Infof("Updating deployment %s", oldD.Name)
|
glog.V(4).Infof("Updating deployment %s", oldD.Name)
|
||||||
// Resync on deployment object relist.
|
// Resync on deployment object relist.
|
||||||
dc.enqueueDeployment(cur.(*extensions.Deployment))
|
dc.enqueueDeployment(cur.(*extensions.Deployment))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) deleteDeploymentNotification(obj interface{}) {
|
func (dc *DeploymentController) deleteDeployment(obj interface{}) {
|
||||||
d, ok := obj.(*extensions.Deployment)
|
d, ok := obj.(*extensions.Deployment)
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
@ -266,27 +271,37 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym
|
|||||||
dc.queue.Add(key)
|
dc.queue.Add(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// enqueueAfter will enqueue a deployment after the provided amount of time in a secondary queue.
|
||||||
|
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
|
||||||
|
// back to the main queue iff it has failed progressing.
|
||||||
|
func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) {
|
||||||
|
key, err := controller.KeyFunc(deployment)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dc.progressQueue.AddAfter(key, after)
|
||||||
|
}
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||||
func (dc *DeploymentController) worker() {
|
func (dc *DeploymentController) worker() {
|
||||||
work := func() bool {
|
for dc.processNextWorkItem() {
|
||||||
key, quit := dc.queue.Get()
|
}
|
||||||
if quit {
|
}
|
||||||
return true
|
|
||||||
}
|
|
||||||
defer dc.queue.Done(key)
|
|
||||||
|
|
||||||
err := dc.syncHandler(key.(string))
|
|
||||||
dc.handleErr(err, key)
|
|
||||||
|
|
||||||
|
func (dc *DeploymentController) processNextWorkItem() bool {
|
||||||
|
key, quit := dc.queue.Get()
|
||||||
|
if quit {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
defer dc.queue.Done(key)
|
||||||
|
|
||||||
for {
|
err := dc.syncHandler(key.(string))
|
||||||
if quit := work(); quit {
|
dc.handleErr(err, key)
|
||||||
return
|
|
||||||
}
|
return true
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
||||||
@ -310,6 +325,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
|||||||
// This function is not meant to be invoked concurrently with the same key.
|
// This function is not meant to be invoked concurrently with the same key.
|
||||||
func (dc *DeploymentController) syncDeployment(key string) error {
|
func (dc *DeploymentController) syncDeployment(key string) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
|
||||||
defer func() {
|
defer func() {
|
||||||
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
|
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
|
||||||
}()
|
}()
|
||||||
@ -453,3 +469,57 @@ func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.De
|
|||||||
delete(deployment.Annotations, util.OverlapAnnotation)
|
delete(deployment.Annotations, util.OverlapAnnotation)
|
||||||
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they
|
||||||
|
// have failed progressing and if so it adds them back to the main queue.
|
||||||
|
func (dc *DeploymentController) progressWorker() {
|
||||||
|
for dc.checkNextItemForProgress() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back
|
||||||
|
// to the main queue.
|
||||||
|
func (dc *DeploymentController) checkNextItemForProgress() bool {
|
||||||
|
key, quit := dc.progressQueue.Get()
|
||||||
|
if quit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer dc.progressQueue.Done(key)
|
||||||
|
|
||||||
|
needsResync, err := dc.checkForProgress(key.(string))
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
}
|
||||||
|
if err == nil && needsResync {
|
||||||
|
dc.queue.AddRateLimited(key)
|
||||||
|
}
|
||||||
|
dc.progressQueue.Forget(key)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkForProgress checks the progress for the provided deployment. Meant to be called
|
||||||
|
// by the progressWorker and work on items synced in a secondary queue.
|
||||||
|
func (dc *DeploymentController) checkForProgress(key string) (bool, error) {
|
||||||
|
obj, exists, err := dc.dLister.Indexer.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(2).Infof("Cannot retrieve deployment %q found in the secondary queue: %#v", key, err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
deployment := obj.(*extensions.Deployment)
|
||||||
|
cond := util.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing)
|
||||||
|
// Already marked with a terminal reason - no need to add it back to the main queue.
|
||||||
|
if cond != nil && (cond.Reason == util.TimedOutReason || cond.Reason == util.NewRSAvailableReason) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// Deep-copy otherwise we may mutate our cache.
|
||||||
|
// TODO: Remove deep-copying from here. This worker does not need to sync the annotations
|
||||||
|
// in the deployment.
|
||||||
|
d, err := util.DeploymentDeepCopy(deployment)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return dc.hasFailed(d)
|
||||||
|
}
|
||||||
|
@ -19,6 +19,9 @@ package deployment
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
@ -138,13 +141,21 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe
|
|||||||
|
|
||||||
// Do not update if there is nothing new to add.
|
// Do not update if there is nothing new to add.
|
||||||
if reflect.DeepEqual(d.Status, newStatus) {
|
if reflect.DeepEqual(d.Status, newStatus) {
|
||||||
// TODO: If there is no sign of progress at this point then there is a high chance that the
|
// If there is no sign of progress at this point then there is a high chance that the
|
||||||
// deployment is stuck. We should resync this deployment at some point[1] in the future[2] and
|
// deployment is stuck. We should resync this deployment at some point[1] in the future
|
||||||
// check if it has timed out. We definitely need this, otherwise we depend on the controller
|
// and check if it has timed out. We definitely need this, otherwise we depend on the
|
||||||
// resync interval. See https://github.com/kubernetes/kubernetes/issues/34458.
|
// controller resync interval. See https://github.com/kubernetes/kubernetes/issues/34458.
|
||||||
//
|
//
|
||||||
// [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition).
|
// [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition).
|
||||||
// [2] Use dc.queue.AddAfter
|
if d.Spec.ProgressDeadlineSeconds != nil &&
|
||||||
|
!util.DeploymentComplete(d, &newStatus) &&
|
||||||
|
!util.DeploymentTimedOut(d, &newStatus) &&
|
||||||
|
currentCond != nil {
|
||||||
|
|
||||||
|
after := time.Now().Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(currentCond.LastUpdateTime.Time)
|
||||||
|
glog.V(4).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds()))
|
||||||
|
dc.enqueueAfter(d, after)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,12 +827,14 @@ func IsRollingUpdate(deployment *extensions.Deployment) bool {
|
|||||||
// updatedReplicas and it doesn't violate minimum availability.
|
// updatedReplicas and it doesn't violate minimum availability.
|
||||||
func DeploymentComplete(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool {
|
func DeploymentComplete(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool {
|
||||||
return newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) &&
|
return newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) &&
|
||||||
newStatus.AvailableReplicas >= *(deployment.Spec.Replicas)-MaxUnavailable(*deployment)
|
newStatus.AvailableReplicas >= *(deployment.Spec.Replicas)-MaxUnavailable(*deployment) &&
|
||||||
|
newStatus.ObservedGeneration >= deployment.Generation
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the
|
// DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the
|
||||||
// current with the new status of the deployment that the controller is observing. The following
|
// current with the new status of the deployment that the controller is observing. More specifically,
|
||||||
// algorithm is already used in the kubectl rolling updater to report lack of progress.
|
// when new pods are scaled up or become available, or old pods are scaled down, then we consider the
|
||||||
|
// deployment is progressing.
|
||||||
func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool {
|
func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool {
|
||||||
oldStatus := deployment.Status
|
oldStatus := deployment.Status
|
||||||
|
|
||||||
@ -840,7 +842,9 @@ func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensi
|
|||||||
oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas
|
oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas
|
||||||
newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas
|
newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas
|
||||||
|
|
||||||
return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) || (newStatusOldReplicas < oldStatusOldReplicas)
|
return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) ||
|
||||||
|
(newStatusOldReplicas < oldStatusOldReplicas) ||
|
||||||
|
newStatus.AvailableReplicas > deployment.Status.AvailableReplicas
|
||||||
}
|
}
|
||||||
|
|
||||||
// used for unit testing
|
// used for unit testing
|
||||||
|
Loading…
Reference in New Issue
Block a user