mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Service controller: use a workqueue for node updates
Services which fail to be successfully synced as a cause by a triggered node event are actually never retried. The commit before this one gave an example of when such services used to be retried before, but that was not really efficient nor fully correct. Moving to a workqueue for node events is a more modern approach to syncing nodes, and placing all service keys that have failed on the service workqueue, in case they do, fixes the re-sync problem Also, now that we are using a node workqueue and use one go-routine to service items from that queue, we don't need the `nodeSyncLock` anymore. So further clean that up from the controller.
This commit is contained in:
parent
72c1c6559e
commit
c44ab14e20
@ -86,16 +86,13 @@ type Controller struct {
|
||||
eventRecorder record.EventRecorder
|
||||
nodeLister corelisters.NodeLister
|
||||
nodeListerSynced cache.InformerSynced
|
||||
// services that need to be synced
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
// nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time
|
||||
// and protects internal states (needFullSync) of nodeSync
|
||||
nodeSyncLock sync.Mutex
|
||||
// nodeSyncCh triggers nodeSyncLoop to run
|
||||
nodeSyncCh chan interface{}
|
||||
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
|
||||
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
|
||||
// services and nodes that need to be synced
|
||||
serviceQueue workqueue.RateLimitingInterface
|
||||
nodeQueue workqueue.RateLimitingInterface
|
||||
// lastSyncedNodes is used when reconciling node state and keeps track of
|
||||
// the last synced set of nodes. This field is concurrently safe because the
|
||||
// nodeQueue is serviced by only one go-routine, so node events are not
|
||||
// processed concurrently.
|
||||
lastSyncedNodes []*v1.Node
|
||||
}
|
||||
|
||||
@ -128,10 +125,9 @@ func New(
|
||||
eventRecorder: recorder,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
|
||||
nodeSyncCh: make(chan interface{}, 1),
|
||||
lastSyncedNodes: []*v1.Node{},
|
||||
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
||||
lastSyncedNodes: []*v1.Node{},
|
||||
}
|
||||
|
||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
@ -162,7 +158,7 @@ func New(
|
||||
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(cur interface{}) {
|
||||
s.triggerNodeSync()
|
||||
s.enqueueNode(cur)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldNode, ok := old.(*v1.Node)
|
||||
@ -179,13 +175,13 @@ func New(
|
||||
return
|
||||
}
|
||||
|
||||
s.triggerNodeSync()
|
||||
s.enqueueNode(curNode)
|
||||
},
|
||||
DeleteFunc: func(old interface{}) {
|
||||
s.triggerNodeSync()
|
||||
s.enqueueNode(old)
|
||||
},
|
||||
},
|
||||
time.Duration(0),
|
||||
nodeSyncPeriod,
|
||||
)
|
||||
|
||||
if err := s.init(); err != nil {
|
||||
@ -202,7 +198,17 @@ func (c *Controller) enqueueService(obj interface{}) {
|
||||
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
c.queue.Add(key)
|
||||
c.serviceQueue.Add(key)
|
||||
}
|
||||
|
||||
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (c *Controller) enqueueNode(obj interface{}) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
c.nodeQueue.Add(key)
|
||||
}
|
||||
|
||||
// Run starts a background goroutine that watches for changes to services that
|
||||
@ -217,7 +223,8 @@ func (c *Controller) enqueueService(obj interface{}) {
|
||||
// object.
|
||||
func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
|
||||
defer runtime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
defer c.serviceQueue.ShutDown()
|
||||
defer c.nodeQueue.ShutDown()
|
||||
|
||||
// Start event processing pipeline.
|
||||
c.eventBroadcaster.StartStructuredLogging(0)
|
||||
@ -234,65 +241,60 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.UntilWithContext(ctx, c.worker, time.Second)
|
||||
go wait.UntilWithContext(ctx, c.serviceWorker, time.Second)
|
||||
}
|
||||
|
||||
go c.nodeSyncLoop(ctx, workers)
|
||||
go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done())
|
||||
// Initialize one go-routine servicing node events. This ensure we only
|
||||
// process one node at any given moment in time
|
||||
go wait.UntilWithContext(ctx, func(ctx context.Context) { c.nodeWorker(ctx, workers) }, time.Second)
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// triggerNodeSync triggers a nodeSync asynchronously
|
||||
func (c *Controller) triggerNodeSync() {
|
||||
c.nodeSyncLock.Lock()
|
||||
defer c.nodeSyncLock.Unlock()
|
||||
select {
|
||||
case c.nodeSyncCh <- struct{}{}:
|
||||
klog.V(4).Info("Triggering nodeSync")
|
||||
return
|
||||
default:
|
||||
klog.V(4).Info("A pending nodeSync is already in queue")
|
||||
return
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (c *Controller) serviceWorker(ctx context.Context) {
|
||||
for c.processNextServiceItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (c *Controller) worker(ctx context.Context) {
|
||||
for c.processNextWorkItem(ctx) {
|
||||
func (c *Controller) nodeWorker(ctx context.Context, workers int) {
|
||||
for c.processNextNodeItem(ctx, workers) {
|
||||
}
|
||||
}
|
||||
|
||||
// nodeSyncLoop takes nodeSync signal and triggers nodeSync
|
||||
func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) {
|
||||
klog.V(4).Info("nodeSyncLoop Started")
|
||||
for {
|
||||
select {
|
||||
case <-c.nodeSyncCh:
|
||||
klog.V(4).Info("nodeSync has been triggered")
|
||||
c.nodeSyncInternal(ctx, workers)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
||||
key, quit := c.queue.Get()
|
||||
func (c *Controller) processNextNodeItem(ctx context.Context, workers int) bool {
|
||||
key, quit := c.nodeQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
defer c.nodeQueue.Done(key)
|
||||
|
||||
for serviceToRetry := range c.syncNodes(ctx, workers) {
|
||||
c.serviceQueue.Add(serviceToRetry)
|
||||
}
|
||||
|
||||
c.nodeQueue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Controller) processNextServiceItem(ctx context.Context) bool {
|
||||
key, quit := c.serviceQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.serviceQueue.Done(key)
|
||||
|
||||
err := c.syncService(ctx, key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
c.serviceQueue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
c.serviceQueue.AddRateLimited(key)
|
||||
return true
|
||||
}
|
||||
|
||||
@ -675,13 +677,13 @@ func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
||||
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
|
||||
}
|
||||
|
||||
// nodeSyncInternal handles updating the hosts pointed to by all load
|
||||
// syncNodes handles updating the hosts pointed to by all load
|
||||
// balancers whenever the set of nodes in the cluster changes.
|
||||
func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
|
||||
func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
latency := time.Since(startTime).Seconds()
|
||||
klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency)
|
||||
klog.V(4).Infof("It took %v seconds to finish syncNodes", latency)
|
||||
nodeSyncLatency.Observe(latency)
|
||||
}()
|
||||
|
||||
@ -691,6 +693,7 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
|
||||
servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
||||
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||
numServices-len(servicesToRetry), numServices)
|
||||
return servicesToRetry
|
||||
}
|
||||
|
||||
// nodeSyncService syncs the nodes for one load balancer type service. The return value
|
||||
|
@ -107,8 +107,8 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
|
||||
eventRecorder: recorder,
|
||||
nodeLister: newFakeNodeLister(nil),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
nodeSyncCh: make(chan interface{}, 1),
|
||||
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
||||
lastSyncedNodes: []*v1.Node{},
|
||||
}
|
||||
|
||||
@ -905,7 +905,7 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) {
|
||||
cachedServiceTest.state = svc
|
||||
controller.cache.set(keyExpected, cachedServiceTest)
|
||||
|
||||
keyGot, quit := controller.queue.Get()
|
||||
keyGot, quit := controller.serviceQueue.Get()
|
||||
if quit {
|
||||
t.Fatalf("get no queue element")
|
||||
}
|
||||
@ -3184,40 +3184,6 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTriggerNodeSync(t *testing.T) {
|
||||
controller, _, _ := newController()
|
||||
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
controller.triggerNodeSync()
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, true)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
controller.triggerNodeSync()
|
||||
controller.triggerNodeSync()
|
||||
controller.triggerNodeSync()
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, true)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
}
|
||||
|
||||
func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
t.Errorf("The channel is closed")
|
||||
}
|
||||
if !expectValue {
|
||||
t.Errorf("Does not expect value from the channel, but got a value")
|
||||
}
|
||||
default:
|
||||
if expectValue {
|
||||
t.Errorf("Expect value from the channel, but got none")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fakeNodeLister struct {
|
||||
cache []*v1.Node
|
||||
err error
|
||||
|
Loading…
Reference in New Issue
Block a user