Allow a replication manager to be created that does not record events

This commit is contained in:
derekwaynecarr 2016-05-10 15:58:49 -04:00
parent a598ae2ab4
commit ff4a5e2068

View File

@ -102,11 +102,18 @@ type ReplicationManager struct {
queue *workqueue.Type queue *workqueue.Type
} }
// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicationManagerInternal(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
}
// newReplicationManagerInternal configures a replication manager with the specified event recorder
func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
} }
@ -115,7 +122,7 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient
kubeClient: kubeClient, kubeClient: kubeClient,
podControl: controller.RealPodControl{ podControl: controller.RealPodControl{
KubeClient: kubeClient, KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), Recorder: eventRecorder,
}, },
burstReplicas: burstReplicas, burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
@ -195,7 +202,14 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient
rm.podStoreSynced = rm.podController.HasSynced rm.podStoreSynced = rm.podController.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm return rm
}
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rm := newReplicationManagerInternal(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rm.internalPodInformer = podInformer
return rm
} }
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
@ -413,18 +427,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() { func (rm *ReplicationManager) worker() {
workFunc := func() bool {
key, quit := rm.queue.Get()
if quit {
return true
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing replication controller: %v", err)
}
return false
}
for { for {
func() { if quit := workFunc(); quit {
key, quit := rm.queue.Get() glog.Infof("replication controller worker shutting down")
if quit { return
return }
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing replication controller: %v", err)
}
}()
} }
} }