mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Merge pull request #34349 from deads2k/controller-07-d-contorller
Automatic merge from submit-queue convert deployment controller to shared informers Converts the deployment controller to shared informers. @kargakis I think you've been in here. Pretty straight forward swap. Fixes #27687
This commit is contained in:
commit
db0529fc7c
@ -415,7 +415,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
|
|||||||
|
|
||||||
if containsResource(resources, "deployments") {
|
if containsResource(resources, "deployments") {
|
||||||
glog.Infof("Starting deployment controller")
|
glog.Infof("Starting deployment controller")
|
||||||
go deployment.NewDeploymentController(client("deployment-controller"), ResyncPeriod(s)).
|
go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")).
|
||||||
Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop)
|
Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
}
|
}
|
||||||
|
@ -36,13 +36,12 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/deployment/util"
|
"k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/informers"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -67,34 +66,28 @@ type DeploymentController struct {
|
|||||||
syncHandler func(dKey string) error
|
syncHandler func(dKey string) error
|
||||||
|
|
||||||
// A store of deployments, populated by the dController
|
// A store of deployments, populated by the dController
|
||||||
dLister cache.StoreToDeploymentLister
|
dLister *cache.StoreToDeploymentLister
|
||||||
// Watches changes to all deployments
|
|
||||||
dController *cache.Controller
|
|
||||||
// A store of ReplicaSets, populated by the rsController
|
// A store of ReplicaSets, populated by the rsController
|
||||||
rsLister cache.StoreToReplicaSetLister
|
rsLister *cache.StoreToReplicaSetLister
|
||||||
// Watches changes to all ReplicaSets
|
|
||||||
rsController *cache.Controller
|
|
||||||
// A store of pods, populated by the podController
|
// A store of pods, populated by the podController
|
||||||
podLister cache.StoreToPodLister
|
podLister *cache.StoreToPodLister
|
||||||
// Watches changes to all pods
|
|
||||||
podController *cache.Controller
|
|
||||||
|
|
||||||
// dListerSynced returns true if the Deployment store has been synced at least once.
|
// dListerSynced returns true if the Deployment store has been synced at least once.
|
||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
dListerSynced func() bool
|
dListerSynced cache.InformerSynced
|
||||||
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
|
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
|
||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
rsListerSynced func() bool
|
rsListerSynced cache.InformerSynced
|
||||||
// podListerSynced returns true if the pod store has been synced at least once.
|
// podListerSynced returns true if the pod store has been synced at least once.
|
||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
podListerSynced func() bool
|
podListerSynced cache.InformerSynced
|
||||||
|
|
||||||
// Deployments that need to be synced
|
// Deployments that need to be synced
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDeploymentController creates a new DeploymentController.
|
// NewDeploymentController creates a new DeploymentController.
|
||||||
func NewDeploymentController(client clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
|
func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, client clientset.Interface) *DeploymentController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||||
@ -109,85 +102,41 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
|
|||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
||||||
}
|
}
|
||||||
|
|
||||||
dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer(
|
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
&cache.ListWatch{
|
AddFunc: dc.addDeploymentNotification,
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
UpdateFunc: dc.updateDeploymentNotification,
|
||||||
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
|
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
||||||
},
|
DeleteFunc: dc.deleteDeploymentNotification,
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
})
|
||||||
return dc.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
|
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
},
|
AddFunc: dc.addReplicaSet,
|
||||||
},
|
UpdateFunc: dc.updateReplicaSet,
|
||||||
&extensions.Deployment{},
|
DeleteFunc: dc.deleteReplicaSet,
|
||||||
FullDeploymentResyncPeriod,
|
})
|
||||||
cache.ResourceEventHandlerFuncs{
|
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: dc.addDeploymentNotification,
|
AddFunc: dc.addPod,
|
||||||
UpdateFunc: dc.updateDeploymentNotification,
|
UpdateFunc: dc.updatePod,
|
||||||
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
DeleteFunc: dc.deletePod,
|
||||||
DeleteFunc: dc.deleteDeploymentNotification,
|
})
|
||||||
},
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
|
||||||
|
|
||||||
dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
|
||||||
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
|
||||||
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&extensions.ReplicaSet{},
|
|
||||||
resyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: dc.addReplicaSet,
|
|
||||||
UpdateFunc: dc.updateReplicaSet,
|
|
||||||
DeleteFunc: dc.deleteReplicaSet,
|
|
||||||
},
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
|
||||||
|
|
||||||
dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
|
||||||
return dc.client.Core().Pods(api.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
|
||||||
return dc.client.Core().Pods(api.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&api.Pod{},
|
|
||||||
resyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: dc.addPod,
|
|
||||||
UpdateFunc: dc.updatePod,
|
|
||||||
DeleteFunc: dc.deletePod,
|
|
||||||
},
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
|
||||||
|
|
||||||
dc.syncHandler = dc.syncDeployment
|
dc.syncHandler = dc.syncDeployment
|
||||||
dc.dListerSynced = dc.dController.HasSynced
|
dc.dLister = dInformer.Lister()
|
||||||
dc.rsListerSynced = dc.rsController.HasSynced
|
dc.rsLister = rsInformer.Lister()
|
||||||
dc.podListerSynced = dc.podController.HasSynced
|
dc.podLister = podInformer.Lister()
|
||||||
|
dc.dListerSynced = dInformer.Informer().HasSynced
|
||||||
|
dc.rsListerSynced = dInformer.Informer().HasSynced
|
||||||
|
dc.podListerSynced = dInformer.Informer().HasSynced
|
||||||
return dc
|
return dc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run begins watching and syncing.
|
// Run begins watching and syncing.
|
||||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
defer dc.queue.ShutDown()
|
||||||
|
|
||||||
go dc.dController.Run(stopCh)
|
glog.Infof("Starting deployment controller")
|
||||||
go dc.rsController.Run(stopCh)
|
|
||||||
go dc.podController.Run(stopCh)
|
|
||||||
|
|
||||||
// Wait for the rc and dc stores to sync before starting any work in this controller.
|
if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
|
||||||
ready := make(chan struct{})
|
|
||||||
go dc.waitForSyncedListers(ready, stopCh)
|
|
||||||
select {
|
|
||||||
case <-ready:
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,21 +146,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
glog.Infof("Shutting down deployment controller")
|
glog.Infof("Shutting down deployment controller")
|
||||||
dc.queue.ShutDown()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) {
|
|
||||||
defer utilruntime.HandleCrash()
|
|
||||||
|
|
||||||
for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() {
|
|
||||||
select {
|
|
||||||
case <-time.After(StoreSyncedPollPeriod):
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close(ready)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
|
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/informers"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/intstr"
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
"k8s.io/kubernetes/pkg/util/uuid"
|
"k8s.io/kubernetes/pkg/util/uuid"
|
||||||
@ -168,8 +169,10 @@ func newFixture(t *testing.T) *fixture {
|
|||||||
|
|
||||||
func (f *fixture) run(deploymentName string) {
|
func (f *fixture) run(deploymentName string) {
|
||||||
f.client = fake.NewSimpleClientset(f.objects...)
|
f.client = fake.NewSimpleClientset(f.objects...)
|
||||||
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
|
informers := informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc())
|
||||||
|
c := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), f.client)
|
||||||
c.eventRecorder = &record.FakeRecorder{}
|
c.eventRecorder = &record.FakeRecorder{}
|
||||||
|
c.dListerSynced = alwaysReady
|
||||||
c.rsListerSynced = alwaysReady
|
c.rsListerSynced = alwaysReady
|
||||||
c.podListerSynced = alwaysReady
|
c.podListerSynced = alwaysReady
|
||||||
for _, d := range f.dLister {
|
for _, d := range f.dLister {
|
||||||
@ -181,6 +184,9 @@ func (f *fixture) run(deploymentName string) {
|
|||||||
for _, pod := range f.podLister {
|
for _, pod := range f.podLister {
|
||||||
c.podLister.Indexer.Add(pod)
|
c.podLister.Indexer.Add(pod)
|
||||||
}
|
}
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
informers.Start(stopCh)
|
||||||
|
|
||||||
err := c.syncDeployment(deploymentName)
|
err := c.syncDeployment(deploymentName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -188,13 +194,25 @@ func (f *fixture) run(deploymentName string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
actions := f.client.Actions()
|
actions := f.client.Actions()
|
||||||
|
informerActions := 0
|
||||||
for i, action := range actions {
|
for i, action := range actions {
|
||||||
if len(f.actions) < i+1 {
|
if len(action.GetNamespace()) == 0 &&
|
||||||
|
(action.Matches("list", "pods") ||
|
||||||
|
action.Matches("list", "replicasets") ||
|
||||||
|
action.Matches("list", "deployments") ||
|
||||||
|
action.Matches("watch", "pods") ||
|
||||||
|
action.Matches("watch", "replicasets") ||
|
||||||
|
action.Matches("watch", "deployments")) {
|
||||||
|
informerActions++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(f.actions)+informerActions < i+1 {
|
||||||
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
|
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedAction := f.actions[i]
|
expectedAction := f.actions[i-informerActions]
|
||||||
if !expectedAction.Matches(action.GetVerb(), action.GetResource().Resource) {
|
if !expectedAction.Matches(action.GetVerb(), action.GetResource().Resource) {
|
||||||
f.t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expectedAction, action)
|
f.t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expectedAction, action)
|
||||||
continue
|
continue
|
||||||
@ -236,12 +254,17 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
|
|||||||
// issue: https://github.com/kubernetes/kubernetes/issues/23218
|
// issue: https://github.com/kubernetes/kubernetes/issues/23218
|
||||||
func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing.T) {
|
func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing.T) {
|
||||||
fake := &fake.Clientset{}
|
fake := &fake.Clientset{}
|
||||||
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
|
informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc())
|
||||||
|
controller := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), fake)
|
||||||
controller.eventRecorder = &record.FakeRecorder{}
|
controller.eventRecorder = &record.FakeRecorder{}
|
||||||
|
controller.dListerSynced = alwaysReady
|
||||||
controller.rsListerSynced = alwaysReady
|
controller.rsListerSynced = alwaysReady
|
||||||
controller.podListerSynced = alwaysReady
|
controller.podListerSynced = alwaysReady
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
informers.Start(stopCh)
|
||||||
|
|
||||||
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||||
empty := unversioned.LabelSelector{}
|
empty := unversioned.LabelSelector{}
|
||||||
d.Spec.Selector = &empty
|
d.Spec.Selector = &empty
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/informers"
|
||||||
"k8s.io/kubernetes/pkg/util/intstr"
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -346,15 +347,21 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) {
|
|||||||
for i := range tests {
|
for i := range tests {
|
||||||
test := tests[i]
|
test := tests[i]
|
||||||
fake := &fake.Clientset{}
|
fake := &fake.Clientset{}
|
||||||
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
|
informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc())
|
||||||
|
controller := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), fake)
|
||||||
|
|
||||||
controller.eventRecorder = &record.FakeRecorder{}
|
controller.eventRecorder = &record.FakeRecorder{}
|
||||||
|
controller.dListerSynced = alwaysReady
|
||||||
controller.rsListerSynced = alwaysReady
|
controller.rsListerSynced = alwaysReady
|
||||||
controller.podListerSynced = alwaysReady
|
controller.podListerSynced = alwaysReady
|
||||||
for _, rs := range test.oldRSs {
|
for _, rs := range test.oldRSs {
|
||||||
controller.rsLister.Indexer.Add(rs)
|
controller.rsLister.Indexer.Add(rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
informers.Start(stopCh)
|
||||||
|
|
||||||
d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"})
|
d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"})
|
||||||
controller.cleanupDeployment(test.oldRSs, d)
|
controller.cleanupDeployment(test.oldRSs, d)
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ package informers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
@ -98,7 +99,9 @@ func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
&extensions.Deployment{},
|
&extensions.Deployment{},
|
||||||
f.defaultResync,
|
// TODO remove this. It is hardcoded so that "Waiting for the second deployment to clear overlapping annotation" in
|
||||||
|
// "overlapping deployment should not fight with each other" will work since it requires a full resync to work properly.
|
||||||
|
30*time.Second,
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||||
)
|
)
|
||||||
f.informers[informerType] = informer
|
f.informers[informerType] = informer
|
||||||
|
Loading…
Reference in New Issue
Block a user