Share rc cache from the rc manager

This commit is contained in:
Michail Kargakis
2017-01-02 17:35:12 +01:00
parent 75c5f4d6d3
commit e5b586b5b0
11 changed files with 241 additions and 272 deletions

View File

@@ -35,7 +35,6 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
@@ -43,27 +42,13 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
const (
// We'll attempt to recompute the required replicas of all replication controllers
// that have fulfilled their expectations at least this often. This recomputation
// happens based on contents in local pod storage.
// Full Resync shouldn't be needed at all in a healthy system. This is a protection
// against disappearing objects and watch notification, that we believe should not
// happen at all.
// TODO: We should get rid of it completely in the fullness of time.
FullControllerResyncPeriod = 10 * time.Minute
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
)
@@ -97,16 +82,14 @@ type ReplicationManager struct {
expectations *controller.UIDTrackingControllerExpectations
// A store of replication controllers, populated by the rcController
rcStore cache.StoreToReplicationControllerLister
// Watches changes to all replication controllers
rcController *cache.Controller
rcLister cache.StoreToReplicationControllerLister
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
podLister cache.StoreToPodLister
// Watches changes to all pods
podController cache.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
podListerSynced func() bool
lookupCache *controller.MatchingCache
@@ -118,27 +101,21 @@ type ReplicationManager struct {
garbageCollectorEnabled bool
}
// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicationManager(
eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}
// newReplicationManager configures a replication manager with the specified event recorder
func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
rm := &ReplicationManager{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventRecorder,
Recorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
@@ -146,29 +123,14 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache
garbageCollectorEnabled: garbageCollectorEnabled,
}
rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
},
},
&v1.ReplicationController{},
// TODO: Can we have much longer period here?
FullControllerResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
})
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
@@ -177,31 +139,21 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podStore.Indexer = podInformer.GetIndexer()
rm.podController = podInformer.GetController()
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
rm.rcLister.Indexer = rcInformer.GetIndexer()
rm.podLister.Indexer = podInformer.GetIndexer()
rm.podListerSynced = podInformer.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm
}
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rm.internalPodInformer = podInformer
return rm
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rcInformer := informers.NewReplicationControllerInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas, lookupCacheSize, false)
rm.internalPodInformer = podInformer
return rm
}
@@ -216,20 +168,23 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting RC Manager")
go rm.rcController.Run(stopCh)
go rm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
defer rm.queue.ShutDown()
glog.Infof("Starting RC Manager")
if rm.internalPodInformer != nil {
go rm.internalPodInformer.Run(stopCh)
}
if !cache.WaitForCacheSync(stopCh, rm.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")
rm.queue.ShutDown()
}
// getPodController returns the controller managing the given pod.
@@ -250,7 +205,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
}
// if not cached or cached value is invalid, search all the rc to find the matching one, and update cache
controllers, err := rm.rcStore.GetPodControllers(pod)
controllers, err := rm.rcLister.GetPodControllers(pod)
if err != nil {
glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name)
return nil
@@ -276,7 +231,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
// isCacheValid check if the cache is valid
func (rm *ReplicationManager) isCacheValid(pod *v1.Pod, cachedRC *v1.ReplicationController) bool {
_, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
_, err := rm.rcLister.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
// rc has been deleted or updated, cache is invalid
if err != nil || !isControllerMatch(pod, cachedRC) {
return false
@@ -648,23 +603,15 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
if !rm.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
rm.queue.Add(key)
return nil
obj, exists, err := rm.rcLister.Indexer.GetByKey(key)
if err != nil {
return err
}
obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
if !exists {
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rc := *obj.(*v1.ReplicationController)
trace.Step("ReplicationController restored")
@@ -678,7 +625,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
if rm.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
@@ -719,7 +666,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
return aggregate
}
} else {
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)