mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 02:11:09 +00:00
Fix the delete flow for ClusterCIDR objects
Fixes the deletion of ClusterCIDR object, when a Node is associated(has Pod CIDRs allocated from this ClusterCIDR) with it. Currently the ClusterCIDR finalizer is never cleaned up as there is no reconciliation happening after the associated Node has been deleted. This commit fixes the issue by adding workitems from all events to a worker queue and reconcile until the delete is successful.
This commit is contained in:
parent
ad2a9f2f33
commit
a8f120b76c
@ -168,8 +168,26 @@ func NewMultiCIDRRangeAllocator(
|
||||
}
|
||||
|
||||
clusterCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: createClusterCIDRHandler(ra.reconcileCreate),
|
||||
DeleteFunc: createClusterCIDRHandler(ra.reconcileDelete),
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
ra.queue.Add(key)
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(new)
|
||||
if err == nil {
|
||||
ra.queue.Add(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
|
||||
// key function.
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
ra.queue.Add(key)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
if allocatorParams.ServiceCIDR != nil {
|
||||
@ -262,6 +280,7 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
|
||||
defer raWaitGroup.Done()
|
||||
r.worker(stopCh)
|
||||
}()
|
||||
go wait.Until(r.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
raWaitGroup.Wait()
|
||||
@ -269,6 +288,69 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
// runWorker is a long-running function that will continually call the
|
||||
// processNextWorkItem function in order to read and process a message on the
|
||||
// workqueue.
|
||||
func (r *multiCIDRRangeAllocator) runWorker() {
|
||||
for r.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem will read a single work item off the workqueue and
|
||||
// attempt to process it, by calling the syncHandler.
|
||||
func (r *multiCIDRRangeAllocator) processNextWorkItem() bool {
|
||||
obj, shutdown := r.queue.Get()
|
||||
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
|
||||
// We wrap this block in a func so we can defer c.workqueue.Done.
|
||||
err := func(obj interface{}) error {
|
||||
// We call Done here so the queue knows we have finished
|
||||
// processing this item. We also must remember to call Forget if we
|
||||
// do not want this work item being re-queued. For example, we do
|
||||
// not call Forget if a transient error occurs, instead the item is
|
||||
// put back on the queue and attempted again after a back-off
|
||||
// period.
|
||||
defer r.queue.Done(obj)
|
||||
var key string
|
||||
var ok bool
|
||||
// We expect strings to come off the workqueue. These are of the
|
||||
// form namespace/name. We do this as the delayed nature of the
|
||||
// workqueue means the items in the informer cache may actually be
|
||||
// more up to date that when the item was initially put onto the
|
||||
// workqueue.
|
||||
if key, ok = obj.(string); !ok {
|
||||
// As the item in the workqueue is actually invalid, we call
|
||||
// Forget here else we'd go into a loop of attempting to
|
||||
// process a work item that is invalid.
|
||||
r.queue.Forget(obj)
|
||||
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
|
||||
return nil
|
||||
}
|
||||
// Run the syncHandler, passing it the namespace/name string of the
|
||||
// Foo resource to be synced.
|
||||
if err := r.syncClusterCIDR(key); err != nil {
|
||||
// Put the item back on the workqueue to handle any transient errors.
|
||||
r.queue.AddRateLimited(key)
|
||||
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
|
||||
}
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
r.queue.Forget(obj)
|
||||
klog.Infof("Successfully synced '%s'", key)
|
||||
return nil
|
||||
}(obj)
|
||||
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
@ -301,16 +383,6 @@ func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// createClusterCIDRHandler creates clusterCIDR handler.
|
||||
func createClusterCIDRHandler(f func(ccc *networkingv1alpha1.ClusterCIDR) error) func(obj interface{}) {
|
||||
return func(originalObj interface{}) {
|
||||
ccc := originalObj.(*networkingv1alpha1.ClusterCIDR)
|
||||
if err := f(ccc); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("error while processing ClusterCIDR Add/Delete: %w", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// needToAddFinalizer checks if a finalizer should be added to the object.
|
||||
func needToAddFinalizer(obj metav1.Object, finalizer string) bool {
|
||||
return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(),
|
||||
@ -392,7 +464,7 @@ func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node)
|
||||
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -687,7 +759,7 @@ func defaultNodeSelector() ([]byte, error) {
|
||||
// Returns 1 CIDR if single stack.
|
||||
// Returns 2 CIDRs , 1 from each ip family if dual stack.
|
||||
func (r *multiCIDRRangeAllocator) prioritizedCIDRs(node *v1.Node) ([]*net.IPNet, *cidrset.ClusterCIDR, error) {
|
||||
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node)
|
||||
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
|
||||
}
|
||||
@ -782,7 +854,7 @@ func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(cidr *net.IPNet)
|
||||
|
||||
// allocatedClusterCIDR returns the ClusterCIDR from which the node CIDRs were allocated.
|
||||
func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *v1.Node) (*cidrset.ClusterCIDR, error) {
|
||||
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node)
|
||||
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
|
||||
}
|
||||
@ -802,7 +874,11 @@ func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *v1.Node) (*cidrset.
|
||||
// P2: ClusterCIDR with a PerNodeMaskSize having fewer IPs has higher priority.
|
||||
// P3: ClusterCIDR having label with lower alphanumeric value has higher priority.
|
||||
// P4: ClusterCIDR with a cidrSet having a smaller IP address value has a higher priority.
|
||||
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node) ([]*cidrset.ClusterCIDR, error) {
|
||||
//
|
||||
// orderedMatchingClusterCIDRs takes `occupy` as an argument, it determines whether the function
|
||||
// is called during an occupy or a release operation. For a release operation, a ClusterCIDR must
|
||||
// be added to the matching ClusterCIDRs list, irrespective of whether the ClusterCIDR is terminating.
|
||||
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node, occupy bool) ([]*cidrset.ClusterCIDR, error) {
|
||||
matchingCIDRs := make([]*cidrset.ClusterCIDR, 0)
|
||||
pq := make(PriorityQueue, 0)
|
||||
|
||||
@ -824,7 +900,8 @@ func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node) ([]
|
||||
}
|
||||
|
||||
// Only push the CIDRsets which are not marked for termination.
|
||||
if !clusterCIDR.Terminating {
|
||||
// Always push the CIDRsets when marked for release.
|
||||
if !occupy || !clusterCIDR.Terminating {
|
||||
heap.Push(&pq, pqItem)
|
||||
}
|
||||
}
|
||||
@ -1090,19 +1167,22 @@ func (r *multiCIDRRangeAllocator) mapClusterCIDRSet(cidrMap map[string][]*cidrse
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcileDelete deletes the ClusterCIDR object and removes the finalizer.
|
||||
// reconcileDelete releases the assigned ClusterCIDR and removes the finalizer
|
||||
// if the deletion timestamp is set.
|
||||
func (r *multiCIDRRangeAllocator) reconcileDelete(clusterCIDR *networkingv1alpha1.ClusterCIDR) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if slice.ContainsString(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer, nil) {
|
||||
klog.V(2).Infof("Releasing ClusterCIDR: %s", clusterCIDR.Name)
|
||||
if err := r.deleteClusterCIDR(clusterCIDR); err != nil {
|
||||
klog.V(2).Infof("Error while deleting ClusterCIDR: %+v", err)
|
||||
return err
|
||||
}
|
||||
// Remove the finalizer as delete is successful.
|
||||
cccCopy := clusterCIDR.DeepCopy()
|
||||
cccCopy.ObjectMeta.Finalizers = slice.RemoveString(cccCopy.ObjectMeta.Finalizers, clusterCIDRFinalizer, nil)
|
||||
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(context.TODO(), clusterCIDR, metav1.UpdateOptions{}); err != nil {
|
||||
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(context.TODO(), cccCopy, metav1.UpdateOptions{}); err != nil {
|
||||
klog.V(2).Infof("Error removing finalizer for ClusterCIDR %s: %v", clusterCIDR.Name, err)
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user