Merge pull request #115529 from aojea/ipam_reconile

replace nodeipam custom logic by  a workqueue
This commit is contained in:
Kubernetes Prow Robot 2023-02-07 03:33:00 -08:00 committed by GitHub
commit fc002b2f07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,7 +22,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"math/rand"
"net" "net"
"sync" "sync"
"time" "time"
@ -73,11 +72,6 @@ type multiCIDRNodeReservedCIDRs struct {
clusterCIDR *cidrset.ClusterCIDR clusterCIDR *cidrset.ClusterCIDR
} }
// multiCIDRNodeProcessingInfo tracks information related to current nodes in processing
type multiCIDRNodeProcessingInfo struct {
retries int
}
type multiCIDRRangeAllocator struct { type multiCIDRRangeAllocator struct {
client clientset.Interface client clientset.Interface
// nodeLister is able to list/get nodes and is populated by the shared informer passed to controller. // nodeLister is able to list/get nodes and is populated by the shared informer passed to controller.
@ -93,14 +87,13 @@ type multiCIDRRangeAllocator struct {
nodeCIDRUpdateChannel chan multiCIDRNodeReservedCIDRs nodeCIDRUpdateChannel chan multiCIDRNodeReservedCIDRs
broadcaster record.EventBroadcaster broadcaster record.EventBroadcaster
recorder record.EventRecorder recorder record.EventRecorder
// queue is where incoming work is placed to de-dup and to allow "easy" // queues are where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors // rate limited requeues on errors
queue workqueue.RateLimitingInterface cidrQueue workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
// lock guards nodesInProcessing and cidrMap to avoid races in CIDR allocation. // lock guards cidrMap to avoid races in CIDR allocation.
lock *sync.Mutex lock *sync.Mutex
// nodesInProcessing is a set of nodes that are currently being processed.
nodesInProcessing map[string]*multiCIDRNodeProcessingInfo
// cidrMap maps ClusterCIDR labels to internal ClusterCIDR objects. // cidrMap maps ClusterCIDR labels to internal ClusterCIDR objects.
cidrMap map[string][]*cidrset.ClusterCIDR cidrMap map[string][]*cidrset.ClusterCIDR
} }
@ -135,9 +128,9 @@ func NewMultiCIDRRangeAllocator(
nodeCIDRUpdateChannel: make(chan multiCIDRNodeReservedCIDRs, cidrUpdateQueueSize), nodeCIDRUpdateChannel: make(chan multiCIDRNodeReservedCIDRs, cidrUpdateQueueSize),
broadcaster: eventBroadcaster, broadcaster: eventBroadcaster,
recorder: recorder, recorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator"), cidrQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator_cidr"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator_node"),
lock: &sync.Mutex{}, lock: &sync.Mutex{},
nodesInProcessing: map[string]*multiCIDRNodeProcessingInfo{},
cidrMap: make(map[string][]*cidrset.ClusterCIDR, 0), cidrMap: make(map[string][]*cidrset.ClusterCIDR, 0),
} }
@ -171,13 +164,13 @@ func NewMultiCIDRRangeAllocator(
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj) key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil { if err == nil {
ra.queue.Add(key) ra.cidrQueue.Add(key)
} }
}, },
UpdateFunc: func(old, new interface{}) { UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new) key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil { if err == nil {
ra.queue.Add(key) ra.cidrQueue.Add(key)
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
@ -185,7 +178,7 @@ func NewMultiCIDRRangeAllocator(
// key function. // key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil { if err == nil {
ra.queue.Add(key) ra.cidrQueue.Add(key)
} }
}, },
}) })
@ -221,33 +214,31 @@ func NewMultiCIDRRangeAllocator(
} }
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), AddFunc: func(obj interface{}) {
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { key, err := cache.MetaNamespaceKeyFunc(obj)
// If the PodCIDRs list is not empty we either: if err == nil {
// - already processed a Node that already had CIDRs after NC restarted ra.nodeQueue.Add(key)
// (cidr is marked as used),
// - already processed a Node successfully and allocated CIDRs for it
// (cidr is marked as used),
// - already processed a Node but we saw a "timeout" response and
// request eventually got through in this case we haven't released
// the allocated CIDRs (cidr is still marked as used).
// There's a possible error here:
// - NC sees a new Node and assigns CIDRs X,Y.. to it,
// - Update Node call fails with a timeout,
// - Node is updated by some other component, NC sees an update and
// assigns CIDRs A,B.. to the Node,
// - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache,
// even though Node sees only CIDR A,B..
// The problem here is that in in-memory cache we see CIDR X,Y.. as marked,
// which prevents it from being assigned to any new node. The cluster
// state is correct.
// Restart of NC fixes the issue.
if len(newNode.Spec.PodCIDRs) == 0 {
return ra.AllocateOrOccupyCIDR(newNode)
} }
return nil },
}), UpdateFunc: func(old, new interface{}) {
DeleteFunc: controllerutil.CreateDeleteNodeHandler(ra.ReleaseCIDR), key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
ra.nodeQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// The informer cache no longer has the object, and since Node doesn't have a finalizer,
// we don't see the Update with DeletionTimestamp != 0.
// TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name
// and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures.
ra.ReleaseCIDR(obj.(*v1.Node))
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ra.nodeQueue.Add(key)
}
},
}) })
return ra, nil return ra, nil
@ -262,7 +253,8 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")}) r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
defer r.broadcaster.Shutdown() defer r.broadcaster.Shutdown()
defer r.queue.ShutDown() defer r.cidrQueue.ShutDown()
defer r.nodeQueue.ShutDown()
klog.Infof("Starting Multi CIDR Range allocator") klog.Infof("Starting Multi CIDR Range allocator")
defer klog.Infof("Shutting down Multi CIDR Range allocator") defer klog.Infof("Shutting down Multi CIDR Range allocator")
@ -271,74 +263,64 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
return return
} }
// raWaitGroup is used to wait for the RangeAllocator to finish the goroutines.
var raWaitGroup sync.WaitGroup
for i := 0; i < cidrUpdateWorkers; i++ { for i := 0; i < cidrUpdateWorkers; i++ {
raWaitGroup.Add(1) go wait.Until(r.runCIDRWorker, time.Second, stopCh)
go func() { go wait.Until(r.runNodeWorker, time.Second, stopCh)
defer raWaitGroup.Done()
r.worker(stopCh)
}()
go wait.Until(r.runWorker, time.Second, stopCh)
} }
raWaitGroup.Wait()
<-stopCh <-stopCh
} }
// runWorker is a long-running function that will continually call the // runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the // processNextWorkItem function in order to read and process a message on the
// workqueue. // cidrQueue.
func (r *multiCIDRRangeAllocator) runWorker() { func (r *multiCIDRRangeAllocator) runCIDRWorker() {
for r.processNextWorkItem() { for r.processNextCIDRWorkItem() {
} }
} }
// processNextWorkItem will read a single work item off the workqueue and // processNextWorkItem will read a single work item off the cidrQueue and
// attempt to process it, by calling the syncHandler. // attempt to process it, by calling the syncHandler.
func (r *multiCIDRRangeAllocator) processNextWorkItem() bool { func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem() bool {
obj, shutdown := r.queue.Get() obj, shutdown := r.cidrQueue.Get()
if shutdown { if shutdown {
return false return false
} }
// We wrap this block in a func so we can defer c.workqueue.Done. // We wrap this block in a func so we can defer c.cidrQueue.Done.
err := func(obj interface{}) error { err := func(obj interface{}) error {
// We call Done here so the queue knows we have finished // We call Done here so the cidrQueue knows we have finished
// processing this item. We also must remember to call Forget if we // processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do // do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is // not call Forget if a transient error occurs, instead the item is
// put back on the queue and attempted again after a back-off // put back on the cidrQueue and attempted again after a back-off
// period. // period.
defer r.queue.Done(obj) defer r.cidrQueue.Done(obj)
var key string var key string
var ok bool var ok bool
// We expect strings to come off the workqueue. These are of the // We expect strings to come off the cidrQueue. These are of the
// form namespace/name. We do this as the delayed nature of the // form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be // cidrQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the // more up to date that when the item was initially put onto the
// workqueue. // cidrQueue.
if key, ok = obj.(string); !ok { if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call // As the item in the cidrQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to // Forget here else we'd go into a loop of attempting to
// process a work item that is invalid. // process a work item that is invalid.
r.queue.Forget(obj) r.cidrQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) utilruntime.HandleError(fmt.Errorf("expected string in cidrQueue but got %#v", obj))
return nil return nil
} }
// Run the syncHandler, passing it the namespace/name string of the // Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced. // Foo resource to be synced.
if err := r.syncClusterCIDR(key); err != nil { if err := r.syncClusterCIDR(key); err != nil {
// Put the item back on the workqueue to handle any transient errors. // Put the item back on the cidrQueue to handle any transient errors.
r.queue.AddRateLimited(key) r.cidrQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
} }
// Finally, if no error occurs we Forget this item so it does not // Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens. // get cidrQueued again until another change happens.
r.queue.Forget(obj) r.cidrQueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key) klog.Infof("Successfully synced '%s'", key)
return nil return nil
}(obj) }(obj)
@ -351,38 +333,89 @@ func (r *multiCIDRRangeAllocator) processNextWorkItem() bool {
return true return true
} }
func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) { func (r *multiCIDRRangeAllocator) runNodeWorker() {
for { for r.processNextNodeWorkItem() {
select {
case workItem, ok := <-r.nodeCIDRUpdateChannel:
if !ok {
klog.Error("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
r.lock.Lock()
if err := r.updateCIDRsAllocation(workItem); err == nil {
klog.V(3).Infof("Updated CIDR for %q", workItem.nodeName)
} else {
klog.Errorf("Error updating CIDR for %q: %v", workItem.nodeName, err)
if canRetry, timeout := r.retryParams(workItem.nodeName); canRetry {
klog.V(2).Infof("Retrying update for %q after %v", workItem.nodeName, timeout)
time.AfterFunc(timeout, func() {
// Requeue the failed node for update again.
r.nodeCIDRUpdateChannel <- workItem
})
continue
}
klog.Errorf("Exceeded retry count for %q, dropping from queue", workItem.nodeName)
}
r.removeNodeFromProcessing(workItem.nodeName)
r.lock.Unlock()
case <-stopChan:
klog.Infof("MultiCIDRRangeAllocator worker is stopping.")
return
}
} }
} }
// processNextWorkItem will read a single work item off the cidrQueue and
// attempt to process it, by calling the syncHandler.
func (r *multiCIDRRangeAllocator) processNextNodeWorkItem() bool {
obj, shutdown := r.nodeQueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.cidrQueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workNodeQueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the nodeQueue and attempted again after a back-off
// period.
defer r.nodeQueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workNodeQueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workNodeQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workNodeQueue.
if key, ok = obj.(string); !ok {
// As the item in the workNodeQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
r.nodeQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncNode(key); err != nil {
// Put the item back on the cidrQueue to handle any transient errors.
r.nodeQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get nodeQueue again until another change happens.
r.nodeQueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (r *multiCIDRRangeAllocator) syncNode(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing Node request %q (%v)", key, time.Since(startTime))
}()
node, err := r.nodeLister.Get(key)
if apierrors.IsNotFound(err) {
klog.V(3).Infof("node has been deleted: %v", key)
// TODO: obtain the node object information to call ReleaseCIDR from here
// and retry if there is an error.
return nil
}
if err != nil {
return err
}
// Check the DeletionTimestamp to determine if object is under deletion.
if !node.DeletionTimestamp.IsZero() {
klog.V(3).Infof("node is being deleted: %v", key)
return r.ReleaseCIDR(node)
}
return r.AllocateOrOccupyCIDR(node)
}
// needToAddFinalizer checks if a finalizer should be added to the object. // needToAddFinalizer checks if a finalizer should be added to the object.
func needToAddFinalizer(obj metav1.Object, finalizer string) bool { func needToAddFinalizer(obj metav1.Object, finalizer string) bool {
return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(), return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(),
@ -412,49 +445,6 @@ func (r *multiCIDRRangeAllocator) syncClusterCIDR(key string) error {
return r.reconcileCreate(clusterCIDR) return r.reconcileCreate(clusterCIDR)
} }
func (r *multiCIDRRangeAllocator) insertNodeToProcessing(nodeName string) bool {
if _, found := r.nodesInProcessing[nodeName]; found {
return false
}
r.nodesInProcessing[nodeName] = &multiCIDRNodeProcessingInfo{}
return true
}
func (r *multiCIDRRangeAllocator) removeNodeFromProcessing(nodeName string) {
klog.Infof("Removing node %q from processing", nodeName)
delete(r.nodesInProcessing, nodeName)
}
func (r *multiCIDRRangeAllocator) retryParams(nodeName string) (bool, time.Duration) {
r.lock.Lock()
defer r.lock.Unlock()
entry, ok := r.nodesInProcessing[nodeName]
if !ok {
klog.Errorf("Cannot get retryParams for %q as entry does not exist", nodeName)
return false, 0
}
count := entry.retries + 1
if count > updateMaxRetries {
return false, 0
}
r.nodesInProcessing[nodeName].retries = count
return true, multiCIDRNodeUpdateRetryTimeout(count)
}
func multiCIDRNodeUpdateRetryTimeout(count int) time.Duration {
timeout := updateRetryTimeout
for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
timeout *= 2
}
if timeout > maxUpdateRetryTimeout {
timeout = maxUpdateRetryTimeout
}
return time.Duration(timeout.Nanoseconds()/2 + rand.Int63n(timeout.Nanoseconds()))
}
// occupyCIDRs marks node.PodCIDRs[...] as used in allocator's tracked cidrSet. // occupyCIDRs marks node.PodCIDRs[...] as used in allocator's tracked cidrSet.
func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error { func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error {
@ -498,7 +488,6 @@ func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error {
return fmt.Errorf("could not occupy cidrs: %v, No matching ClusterCIDRs found", node.Spec.PodCIDRs) return fmt.Errorf("could not occupy cidrs: %v, No matching ClusterCIDRs found", node.Spec.PodCIDRs)
}(node) }(node)
r.removeNodeFromProcessing(node.Name)
return err return err
} }
@ -558,24 +547,17 @@ func (r *multiCIDRRangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
return nil return nil
} }
if !r.insertNodeToProcessing(node.Name) {
klog.Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil
}
if len(node.Spec.PodCIDRs) > 0 { if len(node.Spec.PodCIDRs) > 0 {
return r.occupyCIDRs(node) return r.occupyCIDRs(node)
} }
cidrs, clusterCIDR, err := r.prioritizedCIDRs(node) cidrs, clusterCIDR, err := r.prioritizedCIDRs(node)
if err != nil { if err != nil {
r.removeNodeFromProcessing(node.Name)
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to get cidrs for node %s", node.Name) return fmt.Errorf("failed to get cidrs for node %s", node.Name)
} }
if len(cidrs) == 0 { if len(cidrs) == 0 {
r.removeNodeFromProcessing(node.Name)
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("no cidrSets with matching labels found for node %s", node.Name) return fmt.Errorf("no cidrSets with matching labels found for node %s", node.Name)
} }
@ -725,7 +707,6 @@ func (r *multiCIDRRangeAllocator) updateCIDRsAllocation(data multiCIDRNodeReserv
return err return err
}(data) }(data)
r.removeNodeFromProcessing(data.nodeName)
return err return err
} }