mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-30 14:46:16 +00:00
convert deployment controller to shared informers
This commit is contained in:
@@ -36,13 +36,12 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -67,34 +66,28 @@ type DeploymentController struct {
|
||||
syncHandler func(dKey string) error
|
||||
|
||||
// A store of deployments, populated by the dController
|
||||
dLister cache.StoreToDeploymentLister
|
||||
// Watches changes to all deployments
|
||||
dController *cache.Controller
|
||||
dLister *cache.StoreToDeploymentLister
|
||||
// A store of ReplicaSets, populated by the rsController
|
||||
rsLister cache.StoreToReplicaSetLister
|
||||
// Watches changes to all ReplicaSets
|
||||
rsController *cache.Controller
|
||||
rsLister *cache.StoreToReplicaSetLister
|
||||
// A store of pods, populated by the podController
|
||||
podLister cache.StoreToPodLister
|
||||
// Watches changes to all pods
|
||||
podController *cache.Controller
|
||||
podLister *cache.StoreToPodLister
|
||||
|
||||
// dListerSynced returns true if the Deployment store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
dListerSynced func() bool
|
||||
dListerSynced cache.InformerSynced
|
||||
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
rsListerSynced func() bool
|
||||
rsListerSynced cache.InformerSynced
|
||||
// podListerSynced returns true if the pod store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
podListerSynced func() bool
|
||||
podListerSynced cache.InformerSynced
|
||||
|
||||
// Deployments that need to be synced
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// NewDeploymentController creates a new DeploymentController.
|
||||
func NewDeploymentController(client clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
|
||||
func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, client clientset.Interface) *DeploymentController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||
@@ -109,85 +102,41 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
||||
}
|
||||
|
||||
dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return dc.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&extensions.Deployment{},
|
||||
FullDeploymentResyncPeriod,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addDeploymentNotification,
|
||||
UpdateFunc: dc.updateDeploymentNotification,
|
||||
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
||||
DeleteFunc: dc.deleteDeploymentNotification,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&extensions.ReplicaSet{},
|
||||
resyncPeriod(),
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addReplicaSet,
|
||||
UpdateFunc: dc.updateReplicaSet,
|
||||
DeleteFunc: dc.deleteReplicaSet,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return dc.client.Core().Pods(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return dc.client.Core().Pods(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
resyncPeriod(),
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addPod,
|
||||
UpdateFunc: dc.updatePod,
|
||||
DeleteFunc: dc.deletePod,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addDeploymentNotification,
|
||||
UpdateFunc: dc.updateDeploymentNotification,
|
||||
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
||||
DeleteFunc: dc.deleteDeploymentNotification,
|
||||
})
|
||||
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addReplicaSet,
|
||||
UpdateFunc: dc.updateReplicaSet,
|
||||
DeleteFunc: dc.deleteReplicaSet,
|
||||
})
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addPod,
|
||||
UpdateFunc: dc.updatePod,
|
||||
DeleteFunc: dc.deletePod,
|
||||
})
|
||||
|
||||
dc.syncHandler = dc.syncDeployment
|
||||
dc.dListerSynced = dc.dController.HasSynced
|
||||
dc.rsListerSynced = dc.rsController.HasSynced
|
||||
dc.podListerSynced = dc.podController.HasSynced
|
||||
dc.dLister = dInformer.Lister()
|
||||
dc.rsLister = rsInformer.Lister()
|
||||
dc.podLister = podInformer.Lister()
|
||||
dc.dListerSynced = dInformer.Informer().HasSynced
|
||||
dc.rsListerSynced = dInformer.Informer().HasSynced
|
||||
dc.podListerSynced = dInformer.Informer().HasSynced
|
||||
return dc
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer dc.queue.ShutDown()
|
||||
|
||||
go dc.dController.Run(stopCh)
|
||||
go dc.rsController.Run(stopCh)
|
||||
go dc.podController.Run(stopCh)
|
||||
glog.Infof("Starting deployment controller")
|
||||
|
||||
// Wait for the rc and dc stores to sync before starting any work in this controller.
|
||||
ready := make(chan struct{})
|
||||
go dc.waitForSyncedListers(ready, stopCh)
|
||||
select {
|
||||
case <-ready:
|
||||
case <-stopCh:
|
||||
if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -197,21 +146,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down deployment controller")
|
||||
dc.queue.ShutDown()
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() {
|
||||
select {
|
||||
case <-time.After(StoreSyncedPollPeriod):
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
close(ready)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
|
||||
|
Reference in New Issue
Block a user