diff --git a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go index c3958930408..3acae69f3db 100644 --- a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go +++ b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "math" - "math/rand" "net" "sync" "time" @@ -73,11 +72,6 @@ type multiCIDRNodeReservedCIDRs struct { clusterCIDR *cidrset.ClusterCIDR } -// multiCIDRNodeProcessingInfo tracks information related to current nodes in processing -type multiCIDRNodeProcessingInfo struct { - retries int -} - type multiCIDRRangeAllocator struct { client clientset.Interface // nodeLister is able to list/get nodes and is populated by the shared informer passed to controller. @@ -93,14 +87,13 @@ type multiCIDRRangeAllocator struct { nodeCIDRUpdateChannel chan multiCIDRNodeReservedCIDRs broadcaster record.EventBroadcaster recorder record.EventRecorder - // queue is where incoming work is placed to de-dup and to allow "easy" + // queues are where incoming work is placed to de-dup and to allow "easy" // rate limited requeues on errors - queue workqueue.RateLimitingInterface + cidrQueue workqueue.RateLimitingInterface + nodeQueue workqueue.RateLimitingInterface - // lock guards nodesInProcessing and cidrMap to avoid races in CIDR allocation. + // lock guards cidrMap to avoid races in CIDR allocation. lock *sync.Mutex - // nodesInProcessing is a set of nodes that are currently being processed. - nodesInProcessing map[string]*multiCIDRNodeProcessingInfo // cidrMap maps ClusterCIDR labels to internal ClusterCIDR objects. cidrMap map[string][]*cidrset.ClusterCIDR } @@ -135,9 +128,9 @@ func NewMultiCIDRRangeAllocator( nodeCIDRUpdateChannel: make(chan multiCIDRNodeReservedCIDRs, cidrUpdateQueueSize), broadcaster: eventBroadcaster, recorder: recorder, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator"), + cidrQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator_cidr"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator_node"), lock: &sync.Mutex{}, - nodesInProcessing: map[string]*multiCIDRNodeProcessingInfo{}, cidrMap: make(map[string][]*cidrset.ClusterCIDR, 0), } @@ -171,13 +164,13 @@ func NewMultiCIDRRangeAllocator( AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { - ra.queue.Add(key) + ra.cidrQueue.Add(key) } }, UpdateFunc: func(old, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { - ra.queue.Add(key) + ra.cidrQueue.Add(key) } }, DeleteFunc: func(obj interface{}) { @@ -185,7 +178,7 @@ func NewMultiCIDRRangeAllocator( // key function. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { - ra.queue.Add(key) + ra.cidrQueue.Add(key) } }, }) @@ -221,33 +214,31 @@ func NewMultiCIDRRangeAllocator( } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controllerutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), - UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - // If the PodCIDRs list is not empty we either: - // - already processed a Node that already had CIDRs after NC restarted - // (cidr is marked as used), - // - already processed a Node successfully and allocated CIDRs for it - // (cidr is marked as used), - // - already processed a Node but we saw a "timeout" response and - // request eventually got through in this case we haven't released - // the allocated CIDRs (cidr is still marked as used). - // There's a possible error here: - // - NC sees a new Node and assigns CIDRs X,Y.. to it, - // - Update Node call fails with a timeout, - // - Node is updated by some other component, NC sees an update and - // assigns CIDRs A,B.. to the Node, - // - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache, - // even though Node sees only CIDR A,B.. - // The problem here is that in in-memory cache we see CIDR X,Y.. as marked, - // which prevents it from being assigned to any new node. The cluster - // state is correct. - // Restart of NC fixes the issue. - if len(newNode.Spec.PodCIDRs) == 0 { - return ra.AllocateOrOccupyCIDR(newNode) + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + ra.nodeQueue.Add(key) } - return nil - }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(ra.ReleaseCIDR), + }, + UpdateFunc: func(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + ra.nodeQueue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // The informer cache no longer has the object, and since Node doesn't have a finalizer, + // we don't see the Update with DeletionTimestamp != 0. + // TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name + // and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures. + ra.ReleaseCIDR(obj.(*v1.Node)) + // IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + ra.nodeQueue.Add(key) + } + }, }) return ra, nil @@ -262,7 +253,8 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) { r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")}) defer r.broadcaster.Shutdown() - defer r.queue.ShutDown() + defer r.cidrQueue.ShutDown() + defer r.nodeQueue.ShutDown() klog.Infof("Starting Multi CIDR Range allocator") defer klog.Infof("Shutting down Multi CIDR Range allocator") @@ -271,74 +263,64 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) { return } - // raWaitGroup is used to wait for the RangeAllocator to finish the goroutines. - var raWaitGroup sync.WaitGroup - for i := 0; i < cidrUpdateWorkers; i++ { - raWaitGroup.Add(1) - go func() { - defer raWaitGroup.Done() - r.worker(stopCh) - }() - go wait.Until(r.runWorker, time.Second, stopCh) + go wait.Until(r.runCIDRWorker, time.Second, stopCh) + go wait.Until(r.runNodeWorker, time.Second, stopCh) } - raWaitGroup.Wait() - <-stopCh } // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the -// workqueue. -func (r *multiCIDRRangeAllocator) runWorker() { - for r.processNextWorkItem() { +// cidrQueue. +func (r *multiCIDRRangeAllocator) runCIDRWorker() { + for r.processNextCIDRWorkItem() { } } -// processNextWorkItem will read a single work item off the workqueue and +// processNextWorkItem will read a single work item off the cidrQueue and // attempt to process it, by calling the syncHandler. -func (r *multiCIDRRangeAllocator) processNextWorkItem() bool { - obj, shutdown := r.queue.Get() - +func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem() bool { + obj, shutdown := r.cidrQueue.Get() if shutdown { return false } - // We wrap this block in a func so we can defer c.workqueue.Done. + // We wrap this block in a func so we can defer c.cidrQueue.Done. err := func(obj interface{}) error { - // We call Done here so the queue knows we have finished + // We call Done here so the cidrQueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is - // put back on the queue and attempted again after a back-off + // put back on the cidrQueue and attempted again after a back-off // period. - defer r.queue.Done(obj) + defer r.cidrQueue.Done(obj) var key string var ok bool - // We expect strings to come off the workqueue. These are of the + // We expect strings to come off the cidrQueue. These are of the // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be + // cidrQueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the - // workqueue. + // cidrQueue. if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call + // As the item in the cidrQueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. - r.queue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + r.cidrQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in cidrQueue but got %#v", obj)) return nil } // Run the syncHandler, passing it the namespace/name string of the // Foo resource to be synced. if err := r.syncClusterCIDR(key); err != nil { - // Put the item back on the workqueue to handle any transient errors. - r.queue.AddRateLimited(key) + // Put the item back on the cidrQueue to handle any transient errors. + r.cidrQueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - r.queue.Forget(obj) + // get cidrQueued again until another change happens. + r.cidrQueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) return nil }(obj) @@ -351,38 +333,89 @@ func (r *multiCIDRRangeAllocator) processNextWorkItem() bool { return true } -func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) { - for { - select { - case workItem, ok := <-r.nodeCIDRUpdateChannel: - if !ok { - klog.Error("Channel nodeCIDRUpdateChannel was unexpectedly closed") - return - } - r.lock.Lock() - if err := r.updateCIDRsAllocation(workItem); err == nil { - klog.V(3).Infof("Updated CIDR for %q", workItem.nodeName) - } else { - klog.Errorf("Error updating CIDR for %q: %v", workItem.nodeName, err) - if canRetry, timeout := r.retryParams(workItem.nodeName); canRetry { - klog.V(2).Infof("Retrying update for %q after %v", workItem.nodeName, timeout) - time.AfterFunc(timeout, func() { - // Requeue the failed node for update again. - r.nodeCIDRUpdateChannel <- workItem - }) - continue - } - klog.Errorf("Exceeded retry count for %q, dropping from queue", workItem.nodeName) - } - r.removeNodeFromProcessing(workItem.nodeName) - r.lock.Unlock() - case <-stopChan: - klog.Infof("MultiCIDRRangeAllocator worker is stopping.") - return - } +func (r *multiCIDRRangeAllocator) runNodeWorker() { + for r.processNextNodeWorkItem() { } } +// processNextWorkItem will read a single work item off the cidrQueue and +// attempt to process it, by calling the syncHandler. +func (r *multiCIDRRangeAllocator) processNextNodeWorkItem() bool { + obj, shutdown := r.nodeQueue.Get() + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.cidrQueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workNodeQueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the nodeQueue and attempted again after a back-off + // period. + defer r.nodeQueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workNodeQueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workNodeQueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workNodeQueue. + if key, ok = obj.(string); !ok { + // As the item in the workNodeQueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + r.nodeQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Foo resource to be synced. + if err := r.syncNode(key); err != nil { + // Put the item back on the cidrQueue to handle any transient errors. + r.nodeQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get nodeQueue again until another change happens. + r.nodeQueue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +func (r *multiCIDRRangeAllocator) syncNode(key string) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing Node request %q (%v)", key, time.Since(startTime)) + }() + + node, err := r.nodeLister.Get(key) + if apierrors.IsNotFound(err) { + klog.V(3).Infof("node has been deleted: %v", key) + // TODO: obtain the node object information to call ReleaseCIDR from here + // and retry if there is an error. + return nil + } + if err != nil { + return err + } + // Check the DeletionTimestamp to determine if object is under deletion. + if !node.DeletionTimestamp.IsZero() { + klog.V(3).Infof("node is being deleted: %v", key) + return r.ReleaseCIDR(node) + } + return r.AllocateOrOccupyCIDR(node) +} + // needToAddFinalizer checks if a finalizer should be added to the object. func needToAddFinalizer(obj metav1.Object, finalizer string) bool { return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(), @@ -412,49 +445,6 @@ func (r *multiCIDRRangeAllocator) syncClusterCIDR(key string) error { return r.reconcileCreate(clusterCIDR) } -func (r *multiCIDRRangeAllocator) insertNodeToProcessing(nodeName string) bool { - if _, found := r.nodesInProcessing[nodeName]; found { - return false - } - r.nodesInProcessing[nodeName] = &multiCIDRNodeProcessingInfo{} - return true -} - -func (r *multiCIDRRangeAllocator) removeNodeFromProcessing(nodeName string) { - klog.Infof("Removing node %q from processing", nodeName) - delete(r.nodesInProcessing, nodeName) -} - -func (r *multiCIDRRangeAllocator) retryParams(nodeName string) (bool, time.Duration) { - r.lock.Lock() - defer r.lock.Unlock() - - entry, ok := r.nodesInProcessing[nodeName] - if !ok { - klog.Errorf("Cannot get retryParams for %q as entry does not exist", nodeName) - return false, 0 - } - - count := entry.retries + 1 - if count > updateMaxRetries { - return false, 0 - } - r.nodesInProcessing[nodeName].retries = count - - return true, multiCIDRNodeUpdateRetryTimeout(count) -} - -func multiCIDRNodeUpdateRetryTimeout(count int) time.Duration { - timeout := updateRetryTimeout - for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ { - timeout *= 2 - } - if timeout > maxUpdateRetryTimeout { - timeout = maxUpdateRetryTimeout - } - return time.Duration(timeout.Nanoseconds()/2 + rand.Int63n(timeout.Nanoseconds())) -} - // occupyCIDRs marks node.PodCIDRs[...] as used in allocator's tracked cidrSet. func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error { @@ -498,7 +488,6 @@ func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error { return fmt.Errorf("could not occupy cidrs: %v, No matching ClusterCIDRs found", node.Spec.PodCIDRs) }(node) - r.removeNodeFromProcessing(node.Name) return err } @@ -558,24 +547,17 @@ func (r *multiCIDRRangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { return nil } - if !r.insertNodeToProcessing(node.Name) { - klog.Infof("Node %v is already in a process of CIDR assignment.", node.Name) - return nil - } - if len(node.Spec.PodCIDRs) > 0 { return r.occupyCIDRs(node) } cidrs, clusterCIDR, err := r.prioritizedCIDRs(node) if err != nil { - r.removeNodeFromProcessing(node.Name) controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to get cidrs for node %s", node.Name) } if len(cidrs) == 0 { - r.removeNodeFromProcessing(node.Name) controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("no cidrSets with matching labels found for node %s", node.Name) } @@ -725,7 +707,6 @@ func (r *multiCIDRRangeAllocator) updateCIDRsAllocation(data multiCIDRNodeReserv return err }(data) - r.removeNodeFromProcessing(data.nodeName) return err }