From a8f120b76c392d74acf51546fb617a45cd77f05d Mon Sep 17 00:00:00 2001 From: Sarvesh Rangnekar Date: Sun, 8 Jan 2023 02:48:17 +0000 Subject: [PATCH 1/2] Fix the delete flow for ClusterCIDR objects Fixes the deletion of ClusterCIDR object, when a Node is associated(has Pod CIDRs allocated from this ClusterCIDR) with it. Currently the ClusterCIDR finalizer is never cleaned up as there is no reconciliation happening after the associated Node has been deleted. This commit fixes the issue by adding workitems from all events to a worker queue and reconcile until the delete is successful. --- .../ipam/multi_cidr_range_allocator.go | 118 +++++++++++++++--- 1 file changed, 99 insertions(+), 19 deletions(-) diff --git a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go index e37f739ea69..7445cb71882 100644 --- a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go +++ b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go @@ -168,8 +168,26 @@ func NewMultiCIDRRangeAllocator( } clusterCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: createClusterCIDRHandler(ra.reconcileCreate), - DeleteFunc: createClusterCIDRHandler(ra.reconcileDelete), + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + ra.queue.Add(key) + } + }, + UpdateFunc: func(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + ra.queue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + ra.queue.Add(key) + } + }, }) if allocatorParams.ServiceCIDR != nil { @@ -262,6 +280,7 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) { defer raWaitGroup.Done() r.worker(stopCh) }() + go wait.Until(r.runWorker, time.Second, stopCh) } raWaitGroup.Wait() @@ -269,6 +288,69 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) { <-stopCh } +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (r *multiCIDRRangeAllocator) runWorker() { + for r.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (r *multiCIDRRangeAllocator) processNextWorkItem() bool { + obj, shutdown := r.queue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the queue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the queue and attempted again after a back-off + // period. + defer r.queue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + r.queue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Foo resource to be synced. + if err := r.syncClusterCIDR(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + r.queue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + r.queue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) { for { select { @@ -301,16 +383,6 @@ func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) { } } -// createClusterCIDRHandler creates clusterCIDR handler. -func createClusterCIDRHandler(f func(ccc *networkingv1alpha1.ClusterCIDR) error) func(obj interface{}) { - return func(originalObj interface{}) { - ccc := originalObj.(*networkingv1alpha1.ClusterCIDR) - if err := f(ccc); err != nil { - utilruntime.HandleError(fmt.Errorf("error while processing ClusterCIDR Add/Delete: %w", err)) - } - } -} - // needToAddFinalizer checks if a finalizer should be added to the object. func needToAddFinalizer(obj metav1.Object, finalizer string) bool { return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(), @@ -392,7 +464,7 @@ func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error { return nil } - clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node) + clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true) if err != nil { return err } @@ -687,7 +759,7 @@ func defaultNodeSelector() ([]byte, error) { // Returns 1 CIDR if single stack. // Returns 2 CIDRs , 1 from each ip family if dual stack. func (r *multiCIDRRangeAllocator) prioritizedCIDRs(node *v1.Node) ([]*net.IPNet, *cidrset.ClusterCIDR, error) { - clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node) + clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true) if err != nil { return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err) } @@ -782,7 +854,7 @@ func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(cidr *net.IPNet) // allocatedClusterCIDR returns the ClusterCIDR from which the node CIDRs were allocated. func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *v1.Node) (*cidrset.ClusterCIDR, error) { - clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node) + clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, false) if err != nil { return nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err) } @@ -802,7 +874,11 @@ func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *v1.Node) (*cidrset. // P2: ClusterCIDR with a PerNodeMaskSize having fewer IPs has higher priority. // P3: ClusterCIDR having label with lower alphanumeric value has higher priority. // P4: ClusterCIDR with a cidrSet having a smaller IP address value has a higher priority. -func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node) ([]*cidrset.ClusterCIDR, error) { +// +// orderedMatchingClusterCIDRs takes `occupy` as an argument, it determines whether the function +// is called during an occupy or a release operation. For a release operation, a ClusterCIDR must +// be added to the matching ClusterCIDRs list, irrespective of whether the ClusterCIDR is terminating. +func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node, occupy bool) ([]*cidrset.ClusterCIDR, error) { matchingCIDRs := make([]*cidrset.ClusterCIDR, 0) pq := make(PriorityQueue, 0) @@ -824,7 +900,8 @@ func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node) ([] } // Only push the CIDRsets which are not marked for termination. - if !clusterCIDR.Terminating { + // Always push the CIDRsets when marked for release. + if !occupy || !clusterCIDR.Terminating { heap.Push(&pq, pqItem) } } @@ -1090,19 +1167,22 @@ func (r *multiCIDRRangeAllocator) mapClusterCIDRSet(cidrMap map[string][]*cidrse return nil } -// reconcileDelete deletes the ClusterCIDR object and removes the finalizer. +// reconcileDelete releases the assigned ClusterCIDR and removes the finalizer +// if the deletion timestamp is set. func (r *multiCIDRRangeAllocator) reconcileDelete(clusterCIDR *networkingv1alpha1.ClusterCIDR) error { r.lock.Lock() defer r.lock.Unlock() if slice.ContainsString(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer, nil) { + klog.V(2).Infof("Releasing ClusterCIDR: %s", clusterCIDR.Name) if err := r.deleteClusterCIDR(clusterCIDR); err != nil { + klog.V(2).Infof("Error while deleting ClusterCIDR: %+v", err) return err } // Remove the finalizer as delete is successful. cccCopy := clusterCIDR.DeepCopy() cccCopy.ObjectMeta.Finalizers = slice.RemoveString(cccCopy.ObjectMeta.Finalizers, clusterCIDRFinalizer, nil) - if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(context.TODO(), clusterCIDR, metav1.UpdateOptions{}); err != nil { + if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(context.TODO(), cccCopy, metav1.UpdateOptions{}); err != nil { klog.V(2).Infof("Error removing finalizer for ClusterCIDR %s: %v", clusterCIDR.Name, err) return err } From 203b91c486e40f825ba09e866a82c784bce6b9b5 Mon Sep 17 00:00:00 2001 From: Sarvesh Rangnekar Date: Fri, 30 Dec 2022 03:50:59 +0000 Subject: [PATCH 2/2] Add integration tests for MultiCIDRRangeAllocator Adds integration tests for the following scenarios with MultiCIDRRangeAllocator enabled: - ClusterCIDR is released when an associated node is deleted. - ClusterCIDR delete when a node is associated, validate the finalizer behavior, make sure that deleted ClusterCIDR is cleaned up after the associated node is deleted. - ClusterCIDR marked as terminating due to deletion must not be used for allocating Pod CIDRs to new nodes. - Tie break behavior when multiple ClusterCIDRs are eligible to allocate Pod CIDRs to a node. --- test/integration/clustercidr/ipam_test.go | 435 +++++++++++++++++++++- 1 file changed, 415 insertions(+), 20 deletions(-) diff --git a/test/integration/clustercidr/ipam_test.go b/test/integration/clustercidr/ipam_test.go index 167370e157c..3814d3ccbef 100644 --- a/test/integration/clustercidr/ipam_test.go +++ b/test/integration/clustercidr/ipam_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" networkingv1alpha1 "k8s.io/api/networking/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -40,7 +41,7 @@ import ( netutils "k8s.io/utils/net" ) -func TestIPAMMultiCIDRRangeAllocatorType(t *testing.T) { +func TestIPAMMultiCIDRRangeAllocatorCIDRAllocate(t *testing.T) { // set the feature gate to enable MultiCIDRRangeAllocator defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRRangeAllocator, true)() @@ -85,19 +86,19 @@ func TestIPAMMultiCIDRRangeAllocatorType(t *testing.T) { }, { name: "Single stack IPv4 Pod CIDR assigned to a node", - clusterCIDR: makeClusterCIDR("ipv4-cc", "10.0.0.0/16", "", nodeSelector(map[string][]string{"ipv4": {"true"}, "singlestack": {"true"}})), + clusterCIDR: makeClusterCIDR("ipv4-cc", "10.0.0.0/16", "", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "singlestack": {"true"}})), node: makeNode("ipv4-node", map[string]string{"ipv4": "true", "singlestack": "true"}), expectedPodCIDRs: []string{"10.0.0.0/24"}, }, { name: "Single stack IPv6 Pod CIDR assigned to a node", - clusterCIDR: makeClusterCIDR("ipv6-cc", "", "fd00:20:100::/112", nodeSelector(map[string][]string{"ipv6": {"true"}})), + clusterCIDR: makeClusterCIDR("ipv6-cc", "", "fd00:20:100::/112", 8, nodeSelector(map[string][]string{"ipv6": {"true"}})), node: makeNode("ipv6-node", map[string]string{"ipv6": "true"}), expectedPodCIDRs: []string{"fd00:20:100::/120"}, }, { name: "DualStack Pod CIDRs assigned to a node", - clusterCIDR: makeClusterCIDR("dualstack-cc", "192.168.0.0/16", "fd00:30:100::/112", nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + clusterCIDR: makeClusterCIDR("dualstack-cc", "192.168.0.0/16", "fd00:30:100::/112", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), node: makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true"}), expectedPodCIDRs: []string{"192.168.0.0/24", "fd00:30:100::/120"}, }, @@ -110,17 +111,6 @@ func TestIPAMMultiCIDRRangeAllocatorType(t *testing.T) { if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), test.clusterCIDR, metav1.CreateOptions{}); err != nil { t.Fatal(err) } - - // Wait for the ClusterCIDR to be created - if err := wait.PollImmediate(time.Second, 5*time.Second, func() (bool, error) { - cc, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Get(context.TODO(), test.clusterCIDR.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - return cc != nil, nil - }); err != nil { - t.Fatalf("failed while waiting for ClusterCIDR %q to be created: %v", test.clusterCIDR.Name, err) - } } // Sleep for one second to make sure the controller process the new created ClusterCIDR. @@ -138,6 +128,411 @@ func TestIPAMMultiCIDRRangeAllocatorType(t *testing.T) { } } +func TestIPAMMultiCIDRRangeAllocatorCIDRRelease(t *testing.T) { + // set the feature gate to enable MultiCIDRRangeAllocator + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRRangeAllocator, true)() + + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"} + opts.APIEnablement.RuntimeConfig.Set("networking.k8s.io/v1alpha1=true") + }, + }) + defer tearDownFn() + + clientSet := clientset.NewForConfigOrDie(kubeConfig) + sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) + + ipamController := booststrapMultiCIDRRangeAllocator(t, clientSet, sharedInformer) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ipamController.Run(ctx.Done()) + sharedInformer.Start(ctx.Done()) + + t.Run("Pod CIDR release after node delete", func(t *testing.T) { + // Create the test ClusterCIDR. + clusterCIDR := makeClusterCIDR("dualstack-cc", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})) + if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), clusterCIDR, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + // Sleep for one second to make sure the controller process the new created ClusterCIDR. + time.Sleep(1 * time.Second) + + // Create 1st node and validate that Pod CIDRs are correctly assigned. + node1 := makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true"}) + expectedPodCIDRs1 := []string{"192.168.0.0/24", "fd00:30:100::/120"} + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs, err := nodePodCIDRs(clientSet, node1.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expectedPodCIDRs1, gotPodCIDRs) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", expectedPodCIDRs1, gotPodCIDRs) + } + + // Create 2nd node and validate that Pod CIDRs are correctly assigned. + node2 := makeNode("dualstack-node-2", map[string]string{"ipv4": "true", "ipv6": "true"}) + expectedPodCIDRs2 := []string{"192.168.1.0/24", "fd00:30:100::100/120"} + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs, err := nodePodCIDRs(clientSet, node2.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expectedPodCIDRs2, gotPodCIDRs) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", expectedPodCIDRs2, gotPodCIDRs) + } + + // Delete the 1st node, to validate that the PodCIDRs are released. + if err := clientSet.CoreV1().Nodes().Delete(context.TODO(), node1.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + + // Create 3rd node, validate that it has Pod CIDRs assigned from the released CIDR. + node3 := makeNode("dualstack-node-3", map[string]string{"ipv4": "true", "ipv6": "true"}) + expectedPodCIDRs3 := []string{"192.168.0.0/24", "fd00:30:100::/120"} + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs, err := nodePodCIDRs(clientSet, node3.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expectedPodCIDRs3, gotPodCIDRs) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", expectedPodCIDRs3, gotPodCIDRs) + } + }) +} + +func TestIPAMMultiCIDRRangeAllocatorClusterCIDRDelete(t *testing.T) { + // set the feature gate to enable MultiCIDRRangeAllocator. + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRRangeAllocator, true)() + + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"} + opts.APIEnablement.RuntimeConfig.Set("networking.k8s.io/v1alpha1=true") + }, + }) + defer tearDownFn() + + clientSet := clientset.NewForConfigOrDie(kubeConfig) + sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) + + ipamController := booststrapMultiCIDRRangeAllocator(t, clientSet, sharedInformer) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ipamController.Run(ctx.Done()) + sharedInformer.Start(ctx.Done()) + + t.Run("delete cc with node associated", func(t *testing.T) { + + // Create a ClusterCIDR. + clusterCIDR := makeClusterCIDR("dualstack-cc-del", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})) + if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), clusterCIDR, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + // Sleep for one second to make sure the controller processes the newly created ClusterCIDR. + time.Sleep(1 * time.Second) + + // Create a node, which gets pod CIDR from the clusterCIDR created above. + node := makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true"}) + expectedPodCIDRs := []string{"192.168.0.0/24", "fd00:30:100::/120"} + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs, err := nodePodCIDRs(clientSet, node.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expectedPodCIDRs, gotPodCIDRs) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", expectedPodCIDRs, gotPodCIDRs) + } + + // Delete the ClusterCIDR + if err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Delete(context.TODO(), clusterCIDR.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + + // Sleep for five seconds to make sure the ClusterCIDR exists with a deletion timestamp after marked for deletion. + time.Sleep(5 * time.Second) + + // Make sure that the ClusterCIDR is not deleted, as there is a node associated with it. + cc, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Get(context.TODO(), clusterCIDR.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if cc == nil { + t.Fatalf("expected Cluster CIDR got nil") + } + if cc.DeletionTimestamp.IsZero() { + t.Fatalf("expected Cluster CIDR to have set a deletion timestamp ") + } + + //Delete the node. + if err := clientSet.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + + // Poll to make sure that the Node is deleted. + if err := wait.PollImmediate(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientSet.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + return apierrors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed while waiting for Node %q to be deleted: %v", node.Name, err) + } + + // Poll to make sure that the ClusterCIDR is now deleted, as there is no node associated with it. + if err := wait.PollImmediate(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Get(context.TODO(), clusterCIDR.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + return apierrors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed while waiting for ClusterCIDR %q to be deleted: %v", clusterCIDR.Name, err) + } + }) +} + +func TestIPAMMultiCIDRRangeAllocatorClusterCIDRTerminate(t *testing.T) { + // set the feature gate to enable MultiCIDRRangeAllocator. + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRRangeAllocator, true)() + + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"} + opts.APIEnablement.RuntimeConfig.Set("networking.k8s.io/v1alpha1=true") + }, + }) + defer tearDownFn() + + clientSet := clientset.NewForConfigOrDie(kubeConfig) + sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) + + ipamController := booststrapMultiCIDRRangeAllocator(t, clientSet, sharedInformer) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ipamController.Run(ctx.Done()) + sharedInformer.Start(ctx.Done()) + + t.Run("Pod CIDRS must not be allocated from a terminating CC", func(t *testing.T) { + + // Create a ClusterCIDR which is the best match based on number of matching labels. + clusterCIDR := makeClusterCIDR("dualstack-cc-del", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})) + if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), clusterCIDR, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + // Create a ClusterCIDR which has fewer matching labels than the previous ClusterCIDR. + clusterCIDR2 := makeClusterCIDR("few-label-match-cc-del", "10.1.0.0/23", "fd12:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}})) + if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), clusterCIDR2, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + // Sleep for one second to make sure the controller processes the newly created ClusterCIDR. + time.Sleep(1 * time.Second) + + // Create a node, which gets pod CIDR from the clusterCIDR created above. + node := makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true"}) + expectedPodCIDRs := []string{"192.168.0.0/24", "fd00:30:100::/120"} + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs, err := nodePodCIDRs(clientSet, node.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expectedPodCIDRs, gotPodCIDRs) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", expectedPodCIDRs, gotPodCIDRs) + } + + // Delete the ClusterCIDR + if err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Delete(context.TODO(), clusterCIDR.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + + // Make sure that the ClusterCIDR is not deleted, as there is a node associated with it. + cc, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Get(context.TODO(), clusterCIDR.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if cc == nil { + t.Fatalf("expected Cluster CIDR got nil") + } + if cc.DeletionTimestamp.IsZero() { + t.Fatalf("expected Cluster CIDR to have set a deletion timestamp ") + } + + // Create a node, which should get Pod CIDRs from the ClusterCIDR with fewer matching label Count, + // as the best match ClusterCIDR is marked as terminating. + node2 := makeNode("dualstack-node-2", map[string]string{"ipv4": "true", "ipv6": "true"}) + expectedPodCIDRs2 := []string{"10.1.0.0/24", "fd12:30:100::/120"} + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs2, err := nodePodCIDRs(clientSet, node2.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expectedPodCIDRs2, gotPodCIDRs2) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", expectedPodCIDRs2, gotPodCIDRs2) + } + }) +} + +func TestIPAMMultiCIDRRangeAllocatorClusterCIDRTieBreak(t *testing.T) { + // set the feature gate to enable MultiCIDRRangeAllocator + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRRangeAllocator, true)() + + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"} + opts.APIEnablement.RuntimeConfig.Set("networking.k8s.io/v1alpha1=true") + }, + }) + defer tearDownFn() + + clientSet := clientset.NewForConfigOrDie(kubeConfig) + sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) + + ipamController := booststrapMultiCIDRRangeAllocator(t, clientSet, sharedInformer) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ipamController.Run(ctx.Done()) + sharedInformer.Start(ctx.Done()) + + tests := []struct { + name string + clusterCIDRs []*networkingv1alpha1.ClusterCIDR + node *v1.Node + expectedPodCIDRs []string + }{ + { + name: "ClusterCIDR with highest matching labels", + clusterCIDRs: []*networkingv1alpha1.ClusterCIDR{ + makeClusterCIDR("single-label-match-cc", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"match": {"single"}})), + makeClusterCIDR("double-label-match-cc", "10.0.0.0/23", "fd12:30:200::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + }, + node: makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true", "match": "single"}), + expectedPodCIDRs: []string{"10.0.0.0/24", "fd12:30:200::/120"}, + }, + { + name: "ClusterCIDR with fewer allocatable Pod CIDRs", + clusterCIDRs: []*networkingv1alpha1.ClusterCIDR{ + makeClusterCIDR("single-label-match-cc", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"match": {"single"}})), + makeClusterCIDR("double-label-match-cc", "10.0.0.0/20", "fd12:30:200::/116", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("few-alloc-cc", "172.16.0.0/23", "fd34:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + }, + node: makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true", "match": "single"}), + expectedPodCIDRs: []string{"172.16.0.0/24", "fd34:30:100::/120"}, + }, + { + name: "ClusterCIDR with lower perNodeHostBits", + clusterCIDRs: []*networkingv1alpha1.ClusterCIDR{ + makeClusterCIDR("single-label-match-cc", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"match": {"single"}})), + makeClusterCIDR("double-label-match-cc", "10.0.0.0/20", "fd12:30:200::/116", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("few-alloc-cc", "172.16.0.0/23", "fd34:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("low-pernodehostbits-cc", "172.31.0.0/24", "fd35:30:100::/120", 7, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + }, + node: makeNode("dualstack-node", map[string]string{"ipv4": "true", "ipv6": "true", "match": "single"}), + expectedPodCIDRs: []string{"172.31.0.0/25", "fd35:30:100::/121"}, + }, + { + name: "ClusterCIDR with label having lower alphanumeric value", + clusterCIDRs: []*networkingv1alpha1.ClusterCIDR{ + makeClusterCIDR("single-label-match-cc", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"match": {"single"}})), + makeClusterCIDR("double-label-match-cc", "10.0.0.0/20", "fd12:30:200::/116", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("few-alloc-cc", "172.16.0.0/23", "fd34:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("low-pernodehostbits-cc", "172.31.0.0/24", "fd35:30:100::/120", 7, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("low-alpha-cc", "192.169.0.0/24", "fd12:40:100::/120", 7, nodeSelector(map[string][]string{"apv4": {"true"}, "bpv6": {"true"}})), + }, + node: makeNode("dualstack-node", map[string]string{"apv4": "true", "bpv6": "true", "ipv4": "true", "ipv6": "true", "match": "single"}), + expectedPodCIDRs: []string{"192.169.0.0/25", "fd12:40:100::/121"}, + }, + { + name: "ClusterCIDR with alphanumerically smaller IP address", + clusterCIDRs: []*networkingv1alpha1.ClusterCIDR{ + makeClusterCIDR("single-label-match-cc", "192.168.0.0/23", "fd00:30:100::/119", 8, nodeSelector(map[string][]string{"match": {"single"}})), + makeClusterCIDR("double-label-match-cc", "10.0.0.0/20", "fd12:30:200::/116", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("few-alloc-cc", "172.16.0.0/23", "fd34:30:100::/119", 8, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("low-pernodehostbits-cc", "172.31.0.0/24", "fd35:30:100::/120", 7, nodeSelector(map[string][]string{"ipv4": {"true"}, "ipv6": {"true"}})), + makeClusterCIDR("low-alpha-cc", "192.169.0.0/24", "fd12:40:100::/120", 7, nodeSelector(map[string][]string{"apv4": {"true"}, "bpv6": {"true"}})), + makeClusterCIDR("low-ip-cc", "10.1.0.0/24", "fd00:10:100::/120", 7, nodeSelector(map[string][]string{"apv4": {"true"}, "bpv6": {"true"}})), + }, + node: makeNode("dualstack-node", map[string]string{"apv4": "true", "bpv6": "true", "ipv4": "true", "ipv6": "true", "match": "single"}), + expectedPodCIDRs: []string{"10.1.0.0/25", "fd00:10:100::/121"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + for _, clusterCIDR := range test.clusterCIDRs { + // Create the test ClusterCIDR + if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), clusterCIDR, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + } + + // Sleep for one second to make sure the controller process the new created ClusterCIDR. + time.Sleep(1 * time.Second) + + // Create a node and validate that Pod CIDRs are correctly assigned. + if _, err := clientSet.CoreV1().Nodes().Create(context.TODO(), test.node, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if gotPodCIDRs, err := nodePodCIDRs(clientSet, test.node.Name); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(test.expectedPodCIDRs, gotPodCIDRs) { + t.Errorf("unexpected result, expected Pod CIDRs %v but got %v", test.expectedPodCIDRs, gotPodCIDRs) + } + + // Delete the node. + if err := clientSet.CoreV1().Nodes().Delete(context.TODO(), test.node.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + + // Wait till the Node is deleted. + if err := wait.PollImmediate(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientSet.CoreV1().Nodes().Get(context.TODO(), test.node.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + return apierrors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed while waiting for Node %q to be deleted: %v", test.node.Name, err) + } + + // Delete the Cluster CIDRs. + for _, clusterCIDR := range test.clusterCIDRs { + // Delete the test ClusterCIDR. + if err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Delete(context.TODO(), clusterCIDR.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + + // Wait till the ClusterCIDR is deleted. + if err := wait.PollImmediate(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Get(context.TODO(), clusterCIDR.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + return apierrors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed while waiting for ClusterCIDR %q to be deleted: %v", clusterCIDR.Name, err) + } + } + }) + } +} + func booststrapMultiCIDRRangeAllocator(t *testing.T, clientSet clientset.Interface, sharedInformer informers.SharedInformerFactory, @@ -151,9 +546,9 @@ func booststrapMultiCIDRRangeAllocator(t *testing.T, clusterCIDRs := []*net.IPNet{clusterCIDRv4, clusterCIDRv6} nodeMaskCIDRs := []int{24, 120} - // set the current state of the informer, we can preseed nodes and ClusterCIDRs so we + // set the current state of the informer, we can pre-seed nodes and ClusterCIDRs, so that we // can simulate the bootstrap - initialCC := makeClusterCIDR("initial-cc", "10.2.0.0/16", "fd00:20:96::/112", nodeSelector(map[string][]string{"bootstrap": {"true"}})) + initialCC := makeClusterCIDR("initial-cc", "10.2.0.0/16", "fd00:20:96::/112", 8, nodeSelector(map[string][]string{"bootstrap": {"true"}})) if _, err := clientSet.NetworkingV1alpha1().ClusterCIDRs().Create(context.TODO(), initialCC, metav1.CreateOptions{}); err != nil { t.Fatal(err) } @@ -201,13 +596,13 @@ func makeNode(name string, labels map[string]string) *v1.Node { } } -func makeClusterCIDR(name, ipv4CIDR, ipv6CIDR string, nodeSelector *v1.NodeSelector) *networkingv1alpha1.ClusterCIDR { +func makeClusterCIDR(name, ipv4CIDR, ipv6CIDR string, perNodeHostBits int32, nodeSelector *v1.NodeSelector) *networkingv1alpha1.ClusterCIDR { return &networkingv1alpha1.ClusterCIDR{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: networkingv1alpha1.ClusterCIDRSpec{ - PerNodeHostBits: 8, + PerNodeHostBits: perNodeHostBits, IPv4: ipv4CIDR, IPv6: ipv6CIDR, NodeSelector: nodeSelector, @@ -236,7 +631,7 @@ func nodeSelector(labels map[string][]string) *v1.NodeSelector { func nodePodCIDRs(c clientset.Interface, name string) ([]string, error) { var node *v1.Node - nodePollErr := wait.PollImmediate(time.Second, 5*time.Second, func() (bool, error) { + nodePollErr := wait.PollImmediate(time.Second, wait.ForeverTestTimeout, func() (bool, error) { var err error node, err = c.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) if err != nil {