diff --git a/pkg/controller/nodeipam/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go index 6261672bd61..0a69f0ded28 100644 --- a/pkg/controller/nodeipam/ipam/cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cidr_allocator.go @@ -106,13 +106,6 @@ type CIDRAllocatorParams struct { NodeCIDRMaskSizes []int } -// CIDRs are reserved, then node resource is patched with them. -// nodeReservedCIDRs holds the reservation info for a node. -type nodeReservedCIDRs struct { - allocatedCIDRs []*net.IPNet - nodeName string -} - // New creates a new CIDR range allocator. func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) { nodeList, err := listNodes(ctx, kubeClient) diff --git a/pkg/controller/nodeipam/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go index ddfbd7aaaf1..f3a8b36db2c 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -20,16 +20,16 @@ import ( "context" "fmt" "net" - "sync" + "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" netutils "k8s.io/utils/net" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" informers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -37,6 +37,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" nodeutil "k8s.io/component-helpers/node/util" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" controllerutil "k8s.io/kubernetes/pkg/controller/util/node" @@ -52,14 +53,12 @@ type rangeAllocator struct { nodeLister corelisters.NodeLister // nodesSynced returns true if the node shared informer has been synced at least once. nodesSynced cache.InformerSynced - // Channel that is used to pass updating Nodes and their reserved CIDRs to the background - // This increases a throughput of CIDR assignment by not blocking on long operations. - nodeCIDRUpdateChannel chan nodeReservedCIDRs - broadcaster record.EventBroadcaster - recorder record.EventRecorder - // Keep a set of nodes that are currently being processed to avoid races in CIDR allocation - lock sync.Mutex - nodesInProcessing sets.String + broadcaster record.EventBroadcaster + recorder record.EventRecorder + + // queues are where incoming work is placed to de-dup and to allow "easy" + // rate limited requeues on errors + queue workqueue.RateLimitingInterface } // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs) @@ -89,15 +88,14 @@ func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, node } ra := &rangeAllocator{ - client: client, - clusterCIDRs: allocatorParams.ClusterCIDRs, - cidrSets: cidrSets, - nodeLister: nodeInformer.Lister(), - nodesSynced: nodeInformer.Informer().HasSynced, - nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize), - broadcaster: eventBroadcaster, - recorder: recorder, - nodesInProcessing: sets.NewString(), + client: client, + clusterCIDRs: allocatorParams.ClusterCIDRs, + cidrSets: cidrSets, + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + broadcaster: eventBroadcaster, + recorder: recorder, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cidrallocator_node"), } if allocatorParams.ServiceCIDR != nil { @@ -130,37 +128,33 @@ func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, node } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error { - return ra.AllocateOrOccupyCIDR(logger, node) - }), - UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - // If the PodCIDRs list is not empty we either: - // - already processed a Node that already had CIDRs after NC restarted - // (cidr is marked as used), - // - already processed a Node successfully and allocated CIDRs for it - // (cidr is marked as used), - // - already processed a Node but we did saw a "timeout" response and - // request eventually got through in this case we haven't released - // the allocated CIDRs (cidr is still marked as used). - // There's a possible error here: - // - NC sees a new Node and assigns CIDRs X,Y.. to it, - // - Update Node call fails with a timeout, - // - Node is updated by some other component, NC sees an update and - // assigns CIDRs A,B.. to the Node, - // - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache, - // even though Node sees only CIDR A,B.. - // The problem here is that in in-memory cache we see CIDR X,Y.. as marked, - // which prevents it from being assigned to any new node. The cluster - // state is correct. - // Restart of NC fixes the issue. - if len(newNode.Spec.PodCIDRs) == 0 { - return ra.AllocateOrOccupyCIDR(logger, newNode) + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + ra.queue.Add(key) } - return nil - }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { - return ra.ReleaseCIDR(logger, node) - }), + }, + UpdateFunc: func(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + ra.queue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // The informer cache no longer has the object, and since Node doesn't have a finalizer, + // we don't see the Update with DeletionTimestamp != 0. + // TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name + // and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures. + if err := ra.ReleaseCIDR(logger, obj.(*v1.Node)); err != nil { + utilruntime.HandleError(fmt.Errorf("error while processing CIDR Release: %w", err)) + } + // IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + ra.queue.Add(key) + } + }, }) return ra, nil @@ -176,6 +170,8 @@ func (r *rangeAllocator) Run(ctx context.Context) { r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")}) defer r.broadcaster.Shutdown() + defer r.queue.ShutDown() + logger.Info("Starting range CIDR allocator") defer logger.Info("Shutting down range CIDR allocator") @@ -184,50 +180,100 @@ func (r *rangeAllocator) Run(ctx context.Context) { } for i := 0; i < cidrUpdateWorkers; i++ { - go r.worker(ctx) + go wait.UntilWithContext(ctx, r.runWorker, time.Second) } <-ctx.Done() } -func (r *rangeAllocator) worker(ctx context.Context) { - logger := klog.FromContext(ctx) - for { - select { - case workItem, ok := <-r.nodeCIDRUpdateChannel: - if !ok { - logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed") - return - } - if err := r.updateCIDRsAllocation(logger, workItem); err != nil { - // Requeue the failed node for update again. - r.nodeCIDRUpdateChannel <- workItem - } - case <-ctx.Done(): - return - } +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// queue. +func (r *rangeAllocator) runWorker(ctx context.Context) { + for r.processNextNodeWorkItem(ctx) { } } -func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { - r.lock.Lock() - defer r.lock.Unlock() - if r.nodesInProcessing.Has(nodeName) { +// processNextWorkItem will read a single work item off the queue and +// attempt to process it, by calling the syncHandler. +func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool { + obj, shutdown := r.queue.Get() + if shutdown { return false } - r.nodesInProcessing.Insert(nodeName) + + // We wrap this block in a func so we can defer r.queue.Done. + err := func(logger klog.Logger, obj interface{}) error { + // We call Done here so the workNodeQueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the queue and attempted again after a back-off + // period. + defer r.queue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workNodeQueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workNodeQueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workNodeQueue. + if key, ok = obj.(string); !ok { + // As the item in the workNodeQueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + r.queue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Foo resource to be synced. + if err := r.syncNode(logger, key); err != nil { + // Put the item back on the queue to handle any transient errors. + r.queue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queue again until another change happens. + r.queue.Forget(obj) + logger.Info("Successfully synced", "key", key) + return nil + }(klog.FromContext(ctx), obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + return true } -func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { - r.lock.Lock() - defer r.lock.Unlock() - r.nodesInProcessing.Delete(nodeName) +func (r *rangeAllocator) syncNode(logger klog.Logger, key string) error { + startTime := time.Now() + defer func() { + logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime)) + }() + + node, err := r.nodeLister.Get(key) + if apierrors.IsNotFound(err) { + logger.V(3).Info("node has been deleted", "node", key) + // TODO: obtain the node object information to call ReleaseCIDR from here + // and retry if there is an error. + return nil + } + if err != nil { + return err + } + // Check the DeletionTimestamp to determine if object is under deletion. + if !node.DeletionTimestamp.IsZero() { + logger.V(3).Info("node is being deleted", "node", key) + return r.ReleaseCIDR(logger, node) + } + return r.AllocateOrOccupyCIDR(logger, node) } // marks node.PodCIDRs[...] as used in allocator's tracked cidrSet func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error { - defer r.removeNodeFromProcessing(node.Name) if len(node.Spec.PodCIDRs) == 0 { return nil } @@ -257,34 +303,25 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) if node == nil { return nil } - if !r.insertNodeToProcessing(node.Name) { - logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node)) - return nil - } if len(node.Spec.PodCIDRs) > 0 { return r.occupyCIDRs(node) } - // allocate and queue the assignment - allocated := nodeReservedCIDRs{ - nodeName: node.Name, - allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)), - } + + allocatedCIDRs := make([]*net.IPNet, len(r.cidrSets)) for idx := range r.cidrSets { podCIDR, err := r.cidrSets[idx].AllocateNext() if err != nil { - r.removeNodeFromProcessing(node.Name) controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err) } - allocated.allocatedCIDRs[idx] = podCIDR + allocatedCIDRs[idx] = podCIDR } //queue the assignment - logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocated.allocatedCIDRs) - r.nodeCIDRUpdateChannel <- allocated - return nil + logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocatedCIDRs) + return r.updateCIDRsAllocation(logger, node.Name, allocatedCIDRs) } // ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets @@ -336,21 +373,20 @@ func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR * } // updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server. -func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error { +func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, nodeName string, allocatedCIDRs []*net.IPNet) error { var err error var node *v1.Node - defer r.removeNodeFromProcessing(data.nodeName) - cidrsString := ipnetToStringList(data.allocatedCIDRs) - node, err = r.nodeLister.Get(data.nodeName) + cidrsString := ipnetToStringList(allocatedCIDRs) + node, err = r.nodeLister.Get(nodeName) if err != nil { - logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", data.nodeName)) + logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", nodeName)) return err } // if cidr list matches the proposed. // then we possibly updated this node // and just failed to ack the success. - if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) { + if len(node.Spec.PodCIDRs) == len(allocatedCIDRs) { match := true for idx, cidr := range cidrsString { if node.Spec.PodCIDRs[idx] != cidr { @@ -359,7 +395,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese } } if match { - logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", data.allocatedCIDRs) + logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", allocatedCIDRs) return nil } } @@ -367,7 +403,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese // node has cidrs, release the reserved if len(node.Spec.PodCIDRs) != 0 { logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs) - for idx, cidr := range data.allocatedCIDRs { + for idx, cidr := range allocatedCIDRs { if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil { logger.Error(releaseErr, "Error when releasing CIDR", "index", idx, "CIDR", cidr) } @@ -390,7 +426,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese // NodeController restart will return all falsely allocated CIDRs to the pool. if !apierrors.IsServerTimeout(err) { logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node)) - for idx, cidr := range data.allocatedCIDRs { + for idx, cidr := range allocatedCIDRs { if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil { logger.Error(releaseErr, "Error releasing allocated CIDR for node", "node", klog.KObj(node)) }