diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index f6fc1a03231..273508e75a1 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -86,16 +86,13 @@ type Controller struct { eventRecorder record.EventRecorder nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced - // services that need to be synced - queue workqueue.RateLimitingInterface - - // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time - // and protects internal states (needFullSync) of nodeSync - nodeSyncLock sync.Mutex - // nodeSyncCh triggers nodeSyncLoop to run - nodeSyncCh chan interface{} - // lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of - // nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock + // services and nodes that need to be synced + serviceQueue workqueue.RateLimitingInterface + nodeQueue workqueue.RateLimitingInterface + // lastSyncedNodes is used when reconciling node state and keeps track of + // the last synced set of nodes. This field is concurrently safe because the + // nodeQueue is serviced by only one go-routine, so node events are not + // processed concurrently. lastSyncedNodes []*v1.Node } @@ -128,10 +125,9 @@ func New( eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. - nodeSyncCh: make(chan interface{}, 1), - lastSyncedNodes: []*v1.Node{}, + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + lastSyncedNodes: []*v1.Node{}, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -162,7 +158,7 @@ func New( nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { - s.triggerNodeSync() + s.enqueueNode(cur) }, UpdateFunc: func(old, cur interface{}) { oldNode, ok := old.(*v1.Node) @@ -179,13 +175,13 @@ func New( return } - s.triggerNodeSync() + s.enqueueNode(curNode) }, DeleteFunc: func(old interface{}) { - s.triggerNodeSync() + s.enqueueNode(old) }, }, - time.Duration(0), + nodeSyncPeriod, ) if err := s.init(); err != nil { @@ -202,7 +198,17 @@ func (c *Controller) enqueueService(obj interface{}) { runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) return } - c.queue.Add(key) + c.serviceQueue.Add(key) +} + +// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. +func (c *Controller) enqueueNode(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) + return + } + c.nodeQueue.Add(key) } // Run starts a background goroutine that watches for changes to services that @@ -217,7 +223,8 @@ func (c *Controller) enqueueService(obj interface{}) { // object. func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { defer runtime.HandleCrash() - defer c.queue.ShutDown() + defer c.serviceQueue.ShutDown() + defer c.nodeQueue.ShutDown() // Start event processing pipeline. c.eventBroadcaster.StartStructuredLogging(0) @@ -234,65 +241,60 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.worker, time.Second) + go wait.UntilWithContext(ctx, c.serviceWorker, time.Second) } - go c.nodeSyncLoop(ctx, workers) - go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + // Initialize one go-routine servicing node events. This ensure we only + // process one node at any given moment in time + go wait.UntilWithContext(ctx, func(ctx context.Context) { c.nodeWorker(ctx, workers) }, time.Second) <-ctx.Done() } -// triggerNodeSync triggers a nodeSync asynchronously -func (c *Controller) triggerNodeSync() { - c.nodeSyncLock.Lock() - defer c.nodeSyncLock.Unlock() - select { - case c.nodeSyncCh <- struct{}{}: - klog.V(4).Info("Triggering nodeSync") - return - default: - klog.V(4).Info("A pending nodeSync is already in queue") - return +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (c *Controller) serviceWorker(ctx context.Context) { + for c.processNextServiceItem(ctx) { } } // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (c *Controller) worker(ctx context.Context) { - for c.processNextWorkItem(ctx) { +func (c *Controller) nodeWorker(ctx context.Context, workers int) { + for c.processNextNodeItem(ctx, workers) { } } -// nodeSyncLoop takes nodeSync signal and triggers nodeSync -func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) { - klog.V(4).Info("nodeSyncLoop Started") - for { - select { - case <-c.nodeSyncCh: - klog.V(4).Info("nodeSync has been triggered") - c.nodeSyncInternal(ctx, workers) - case <-ctx.Done(): - return - } - } -} - -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - key, quit := c.queue.Get() +func (c *Controller) processNextNodeItem(ctx context.Context, workers int) bool { + key, quit := c.nodeQueue.Get() if quit { return false } - defer c.queue.Done(key) + defer c.nodeQueue.Done(key) + + for serviceToRetry := range c.syncNodes(ctx, workers) { + c.serviceQueue.Add(serviceToRetry) + } + + c.nodeQueue.Forget(key) + return true +} + +func (c *Controller) processNextServiceItem(ctx context.Context) bool { + key, quit := c.serviceQueue.Get() + if quit { + return false + } + defer c.serviceQueue.Done(key) err := c.syncService(ctx, key.(string)) if err == nil { - c.queue.Forget(key) + c.serviceQueue.Forget(key) return true } runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) - c.queue.AddRateLimited(key) + c.serviceQueue.AddRateLimited(key) return true } @@ -675,13 +677,13 @@ func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...) } -// nodeSyncInternal handles updating the hosts pointed to by all load +// syncNodes handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. -func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { +func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { startTime := time.Now() defer func() { latency := time.Since(startTime).Seconds() - klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency) + klog.V(4).Infof("It took %v seconds to finish syncNodes", latency) nodeSyncLatency.Observe(latency) }() @@ -691,6 +693,7 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", numServices-len(servicesToRetry), numServices) + return servicesToRetry } // nodeSyncService syncs the nodes for one load balancer type service. The return value diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 34c694e9ce0..df0daf1fc0e 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -107,8 +107,8 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { eventRecorder: recorder, nodeLister: newFakeNodeLister(nil), nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeSyncCh: make(chan interface{}, 1), + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), lastSyncedNodes: []*v1.Node{}, } @@ -905,7 +905,7 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) { cachedServiceTest.state = svc controller.cache.set(keyExpected, cachedServiceTest) - keyGot, quit := controller.queue.Get() + keyGot, quit := controller.serviceQueue.Get() if quit { t.Fatalf("get no queue element") } @@ -3184,40 +3184,6 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { } } -func TestTriggerNodeSync(t *testing.T) { - controller, _, _ := newController() - - tryReadFromChannel(t, controller.nodeSyncCh, false) - controller.triggerNodeSync() - tryReadFromChannel(t, controller.nodeSyncCh, true) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - controller.triggerNodeSync() - controller.triggerNodeSync() - controller.triggerNodeSync() - tryReadFromChannel(t, controller.nodeSyncCh, true) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) -} - -func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) { - select { - case _, ok := <-ch: - if !ok { - t.Errorf("The channel is closed") - } - if !expectValue { - t.Errorf("Does not expect value from the channel, but got a value") - } - default: - if expectValue { - t.Errorf("Expect value from the channel, but got none") - } - } -} - type fakeNodeLister struct { cache []*v1.Node err error