mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #28684 from kargakis/deployment-controller-updates
Automatic merge from submit-queue Deployment controller updates @kubernetes/deployment @deads2k PTAL
This commit is contained in:
commit
39bfa168cd
@ -34,6 +34,7 @@ import (
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
@ -51,6 +52,8 @@ const (
|
||||
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
|
||||
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
|
||||
StoreSyncedPollPeriod = 100 * time.Millisecond
|
||||
// MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
|
||||
MaxRetries = 5
|
||||
)
|
||||
|
||||
// DeploymentController is responsible for synchronizing Deployment objects stored
|
||||
@ -70,19 +73,23 @@ type DeploymentController struct {
|
||||
rsStore cache.StoreToReplicaSetLister
|
||||
// Watches changes to all ReplicaSets
|
||||
rsController *framework.Controller
|
||||
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
rsStoreSynced func() bool
|
||||
// A store of pods, populated by the podController
|
||||
podStore cache.StoreToPodLister
|
||||
// Watches changes to all pods
|
||||
podController *framework.Controller
|
||||
|
||||
// dStoreSynced returns true if the Deployment store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
dStoreSynced func() bool
|
||||
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
rsStoreSynced func() bool
|
||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
podStoreSynced func() bool
|
||||
|
||||
// Deployments that need to be synced
|
||||
queue *workqueue.Type
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// NewDeploymentController creates a new DeploymentController.
|
||||
@ -98,7 +105,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
|
||||
dc := &DeploymentController{
|
||||
client: client,
|
||||
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
|
||||
queue: workqueue.New(),
|
||||
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
}
|
||||
|
||||
dc.dStore.Store, dc.dController = framework.NewInformer(
|
||||
@ -158,6 +165,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
|
||||
)
|
||||
|
||||
dc.syncHandler = dc.syncDeployment
|
||||
dc.dStoreSynced = dc.dController.HasSynced
|
||||
dc.rsStoreSynced = dc.rsController.HasSynced
|
||||
dc.podStoreSynced = dc.podController.HasSynced
|
||||
return dc
|
||||
@ -166,17 +174,43 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
|
||||
// Run begins watching and syncing.
|
||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
go dc.dController.Run(stopCh)
|
||||
go dc.rsController.Run(stopCh)
|
||||
go dc.podController.Run(stopCh)
|
||||
|
||||
// Wait for the rc and dc stores to sync before starting any work in this controller.
|
||||
ready := make(chan struct{})
|
||||
go dc.waitForSyncedStores(ready, stopCh)
|
||||
select {
|
||||
case <-ready:
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(dc.worker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down deployment controller")
|
||||
dc.queue.ShutDown()
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() {
|
||||
select {
|
||||
case <-time.After(StoreSyncedPollPeriod):
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
close(ready)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
|
||||
d := obj.(*extensions.Deployment)
|
||||
glog.V(4).Infof("Adding deployment %s", d.Name)
|
||||
@ -382,19 +416,40 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := dc.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer dc.queue.Done(key)
|
||||
err := dc.syncHandler(key.(string))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing deployment %v: %v", key, err)
|
||||
}
|
||||
}()
|
||||
work := func() bool {
|
||||
key, quit := dc.queue.Get()
|
||||
if quit {
|
||||
return true
|
||||
}
|
||||
defer dc.queue.Done(key)
|
||||
|
||||
err := dc.syncHandler(key.(string))
|
||||
dc.handleErr(err, key)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
for {
|
||||
if quit := work(); quit {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
||||
if err == nil {
|
||||
dc.queue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
if dc.queue.NumRequeues(key) < MaxRetries {
|
||||
glog.V(2).Infof("Error syncing deployment %v: %v", key, err)
|
||||
dc.queue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
|
||||
utilruntime.HandleError(err)
|
||||
dc.queue.Forget(key)
|
||||
}
|
||||
|
||||
// syncDeployment will sync the deployment with the given key.
|
||||
@ -405,18 +460,9 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
if !dc.rsStoreSynced() || !dc.podStoreSynced() {
|
||||
// Sleep so we give the replica set / pod reflector goroutine a chance to run.
|
||||
time.Sleep(StoreSyncedPollPeriod)
|
||||
glog.Infof("Waiting for replica set / pod controller to sync, requeuing deployment %s", key)
|
||||
dc.queue.Add(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
obj, exists, err := dc.dStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
|
||||
dc.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
@ -424,13 +470,20 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d := obj.(*extensions.Deployment)
|
||||
deployment := obj.(*extensions.Deployment)
|
||||
everything := unversioned.LabelSelector{}
|
||||
if reflect.DeepEqual(d.Spec.Selector, &everything) {
|
||||
dc.eventRecorder.Eventf(d, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
|
||||
if reflect.DeepEqual(deployment.Spec.Selector, &everything) {
|
||||
dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deep-copy otherwise we are mutating our cache.
|
||||
// TODO: Deep-copy only when needed.
|
||||
d, err := util.DeploymentDeepCopy(deployment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.Spec.Paused {
|
||||
return dc.sync(d)
|
||||
}
|
||||
|
@ -273,7 +273,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
|
||||
deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
|
||||
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
|
||||
if err != nil {
|
||||
dc.enqueueDeployment(deployment)
|
||||
return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err)
|
||||
}
|
||||
if newReplicasCount > 0 {
|
||||
@ -415,9 +414,6 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc
|
||||
rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
|
||||
if err == nil {
|
||||
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
|
||||
} else {
|
||||
glog.Warningf("Cannot update replica set %q: %v", rs.Name, err)
|
||||
dc.enqueueDeployment(deployment)
|
||||
}
|
||||
return rs, err
|
||||
}
|
||||
|
@ -753,3 +753,15 @@ func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired
|
||||
|
||||
return int32(surge), int32(unavailable), nil
|
||||
}
|
||||
|
||||
func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployment, error) {
|
||||
objCopy, err := api.Scheme.DeepCopy(deployment)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
copied, ok := objCopy.(*extensions.Deployment)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expected Deployment, got %#v", objCopy)
|
||||
}
|
||||
return copied, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user