From 584d7103e25bf0a18eadecc7e35c92304e4bfd3a Mon Sep 17 00:00:00 2001 From: "Khaled Henidak(Kal)" Date: Tue, 11 Jun 2019 16:04:50 +0000 Subject: [PATCH] node ipam controller for ipv6 dualstack --- cmd/cloud-controller-manager/app/BUILD | 2 + cmd/kube-controller-manager/app/core.go | 52 ++- pkg/controller/nodeipam/BUILD | 2 + pkg/controller/nodeipam/ipam/BUILD | 3 + .../nodeipam/ipam/cidr_allocator.go | 9 +- .../nodeipam/ipam/range_allocator.go | 259 ++++++----- .../nodeipam/ipam/range_allocator_test.go | 403 +++++++++++------- .../nodeipam/node_ipam_controller.go | 43 +- .../nodeipam/node_ipam_controller_test.go | 12 +- pkg/util/node/node.go | 21 + test/integration/ipamperf/ipam_test.go | 2 +- 11 files changed, 518 insertions(+), 290 deletions(-) diff --git a/cmd/cloud-controller-manager/app/BUILD b/cmd/cloud-controller-manager/app/BUILD index 4d215ba845a..35df170b985 100644 --- a/cmd/cloud-controller-manager/app/BUILD +++ b/cmd/cloud-controller-manager/app/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/controller/cloud:go_default_library", "//pkg/controller/route:go_default_library", "//pkg/controller/service:go_default_library", + "//pkg/features:go_default_library", "//pkg/util/configz:go_default_library", "//pkg/util/flag:go_default_library", "//pkg/version:go_default_library", @@ -24,6 +25,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/term:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 81200ce4d95..78478cfc702 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -58,9 +58,11 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/pvcprotection" "k8s.io/kubernetes/pkg/controller/volume/pvprotection" "k8s.io/kubernetes/pkg/features" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/quota/v1/generic" quotainstall "k8s.io/kubernetes/pkg/quota/v1/install" "k8s.io/kubernetes/pkg/util/metrics" + netutils "k8s.io/utils/net" ) func startServiceController(ctx ControllerContext) (http.Handler, bool, error) { @@ -79,23 +81,36 @@ func startServiceController(ctx ControllerContext) (http.Handler, bool, error) { go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) return nil, true, nil } - func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error) { - var clusterCIDR *net.IPNet var serviceCIDR *net.IPNet + // should we start nodeIPAM if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs { return nil, false, nil } - var err error - if len(strings.TrimSpace(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 { - _, clusterCIDR, err = net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) - if err != nil { - klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.ComponentConfig.KubeCloudShared.ClusterCIDR, err) - } + // failure: bad cidrs in config + clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) + if err != nil { + return nil, false, err } + // failure: more than one cidr and dual stack is not enabled + if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) { + return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs)) + } + + // failure: more than one cidr but they are not configured as dual stack + if len(clusterCIDRs) > 1 && !dualStack { + return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs)) + } + + // failure: more than cidrs is not allowed even with dual stack + if len(clusterCIDRs) > 2 { + return nil, false, fmt.Errorf("len of clusters is:%v > more than max allowed of 2", len(clusterCIDRs)) + } + + // service cidr processing if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 { _, serviceCIDR, err = net.ParseCIDR(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR) if err != nil { @@ -107,7 +122,7 @@ func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error) ctx.InformerFactory.Core().V1().Nodes(), ctx.Cloud, ctx.ClientBuilder.ClientOrDie("node-controller"), - clusterCIDR, + clusterCIDRs, serviceCIDR, int(ctx.ComponentConfig.NodeIPAMController.NodeCIDRMaskSize), ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType), @@ -448,3 +463,22 @@ func startTTLAfterFinishedController(ctx ControllerContext) (http.Handler, bool, ).Run(int(ctx.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Stop) return nil, true, nil } + +// processCIDRs is a helper function that works on a comma separated cidrs and returns +// a list of typed cidrs +// a flag if cidrs represents a dual stack +// error if failed to parse any of the cidrs +func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) { + cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",") + + cidrs, err := netutils.ParseCIDRs(cidrsSplit) + if err != nil { + return nil, false, err + } + + // if cidrs has an error then the previous call will fail + // safe to ignore error checking on next call + dualstack, _ := netutils.IsDualStackCIDRs(cidrs) + + return cidrs, dualstack, nil +} diff --git a/pkg/controller/nodeipam/BUILD b/pkg/controller/nodeipam/BUILD index bedd8fa7763..a12c01cabe0 100644 --- a/pkg/controller/nodeipam/BUILD +++ b/pkg/controller/nodeipam/BUILD @@ -33,9 +33,11 @@ go_library( "//pkg/controller:go_default_library", "//pkg/controller/nodeipam/ipam:go_default_library", "//pkg/controller/nodeipam/ipam/sync:go_default_library", + "//pkg/features:go_default_library", "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", diff --git a/pkg/controller/nodeipam/ipam/BUILD b/pkg/controller/nodeipam/ipam/BUILD index b50ce534ddb..3dfdecffe77 100644 --- a/pkg/controller/nodeipam/ipam/BUILD +++ b/pkg/controller/nodeipam/ipam/BUILD @@ -46,6 +46,7 @@ go_library( "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", "//pkg/controller/nodeipam/ipam/sync:go_default_library", "//pkg/controller/util/node:go_default_library", + "//pkg/features:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/taints:go_default_library", @@ -58,6 +59,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -69,6 +71,7 @@ go_library( "//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/scheme:go_default_library", "//vendor/k8s.io/klog:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], ) diff --git a/pkg/controller/nodeipam/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go index d667d11eec0..3e7b8214eaa 100644 --- a/pkg/controller/nodeipam/ipam/cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cidr_allocator.go @@ -33,11 +33,6 @@ import ( cloudprovider "k8s.io/cloud-provider" ) -type nodeAndCIDR struct { - cidr *net.IPNet - nodeName string -} - // CIDRAllocatorType is the type of the allocator to use. type CIDRAllocatorType string @@ -94,7 +89,7 @@ type CIDRAllocator interface { } // New creates a new CIDR range allocator. -func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) { +func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, clusterCIDRs []*net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) { nodeList, err := listNodes(kubeClient) if err != nil { return nil, err @@ -102,7 +97,7 @@ func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInfo switch allocatorType { case RangeAllocatorType: - return NewCIDRRangeAllocator(kubeClient, nodeInformer, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList) + return NewCIDRRangeAllocator(kubeClient, nodeInformer, clusterCIDRs, serviceCIDR, nodeCIDRMaskSize, nodeList) case CloudAllocatorType: return NewCloudCIDRAllocator(kubeClient, cloud, nodeInformer) default: diff --git a/pkg/controller/nodeipam/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go index 9da52e63aa7..bfe80ca2440 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -41,33 +41,38 @@ import ( utilnode "k8s.io/kubernetes/pkg/util/node" ) -type rangeAllocator struct { - client clientset.Interface - cidrs *cidrset.CidrSet - clusterCIDR *net.IPNet - maxCIDRs int +// cidrs are reserved, then node resource is patched with them +// this type holds the reservation info for a node +type nodeReservedCIDRs struct { + allocatedCIDRs []*net.IPNet + nodeName string +} - // nodeLister is able to list/get nodes and is populated by the shared informer passed to - // NewCloudCIDRAllocator. +type rangeAllocator struct { + client clientset.Interface + // cluster cidrs as passed in during controller creation + clusterCIDRs []*net.IPNet + // for each entry in clusterCIDRs we maintain a list of what is used and what is not + cidrSets []*cidrset.CidrSet + // nodeLister is able to list/get nodes and is populated by the shared informer passed to controller nodeLister corelisters.NodeLister // nodesSynced returns true if the node shared informer has been synced at least once. nodesSynced cache.InformerSynced - - // Channel that is used to pass updating Nodes with assigned CIDRs to the background + // Channel that is used to pass updating Nodes and their reserved CIDRs to the background // This increases a throughput of CIDR assignment by not blocking on long operations. - nodeCIDRUpdateChannel chan nodeAndCIDR + nodeCIDRUpdateChannel chan nodeReservedCIDRs recorder record.EventRecorder - // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation lock sync.Mutex nodesInProcessing sets.String } -// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node +// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs) // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. -// Caller must always pass in a list of existing nodes so the new allocator +// Caller must always pass in a list of existing nodes so the new allocator. +// Caller must ensure that ClusterCIDRs are semantically correct e.g (1 for non DualStack, 2 for DualStack etc..) // can initialize its CIDR map. NodeList is only nil in testing. -func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) { +func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDRs []*net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) { if client == nil { klog.Fatalf("kubeClient is nil when starting NodeController") } @@ -78,17 +83,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No klog.V(0).Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) - set, err := cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize) - if err != nil { - return nil, err + // create a cidrSet for each cidr we operate on + // cidrSet are mapped to clusterCIDR by index + cidrSets := make([]*cidrset.CidrSet, len(clusterCIDRs)) + for idx, cidr := range clusterCIDRs { + cidrSet, err := cidrset.NewCIDRSet(cidr, subNetMaskSize) + if err != nil { + return nil, err + } + cidrSets[idx] = cidrSet } + ra := &rangeAllocator{ client: client, - cidrs: set, - clusterCIDR: clusterCIDR, + clusterCIDRs: clusterCIDRs, + cidrSets: cidrSets, nodeLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, - nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), + nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize), recorder: recorder, nodesInProcessing: sets.NewString(), } @@ -101,16 +113,14 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No if nodeList != nil { for _, node := range nodeList.Items { - if node.Spec.PodCIDR == "" { - klog.Infof("Node %v has no CIDR, ignoring", node.Name) + if len(node.Spec.PodCIDRs) == 0 { + klog.V(4).Infof("Node %v has no CIDR, ignoring", node.Name) continue - } else { - klog.Infof("Node %v has CIDR %s, occupying it in CIDR map", - node.Name, node.Spec.PodCIDR) } - if err := ra.occupyCIDR(&node); err != nil { + klog.V(4).Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR) + if err := ra.occupyCIDRs(&node); err != nil { // This will happen if: - // 1. We find garbage in the podCIDR field. Retrying is useless. + // 1. We find garbage in the podCIDRs field. Retrying is useless. // 2. CIDR out of range: This means a node CIDR has changed. // This error will keep crashing controller-manager. return nil, err @@ -121,26 +131,26 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - // If the PodCIDR is not empty we either: - // - already processed a Node that already had a CIDR after NC restarted + // If the PodCIDRs list is not empty we either: + // - already processed a Node that already had CIDRs after NC restarted // (cidr is marked as used), - // - already processed a Node successfully and allocated a CIDR for it + // - already processed a Node successfully and allocated CIDRs for it // (cidr is marked as used), // - already processed a Node but we did saw a "timeout" response and // request eventually got through in this case we haven't released - // the allocated CIDR (cidr is still marked as used). + // the allocated CIDRs (cidr is still marked as used). // There's a possible error here: - // - NC sees a new Node and assigns a CIDR X to it, + // - NC sees a new Node and assigns CIDRs X,Y.. to it, // - Update Node call fails with a timeout, // - Node is updated by some other component, NC sees an update and - // assigns CIDR Y to the Node, - // - Both CIDR X and CIDR Y are marked as used in the local cache, - // even though Node sees only CIDR Y - // The problem here is that in in-memory cache we see CIDR X as marked, + // assigns CIDRs A,B.. to the Node, + // - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache, + // even though Node sees only CIDR A,B.. + // The problem here is that in in-memory cache we see CIDR X,Y.. as marked, // which prevents it from being assigned to any new node. The cluster // state is correct. // Restart of NC fixes the issue. - if newNode.Spec.PodCIDR == "" { + if len(newNode.Spec.PodCIDRs) == 0 { return ra.AllocateOrOccupyCIDR(newNode) } return nil @@ -176,7 +186,7 @@ func (r *rangeAllocator) worker(stopChan <-chan struct{}) { klog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed") return } - if err := r.updateCIDRAllocation(workItem); err != nil { + if err := r.updateCIDRsAllocation(workItem); err != nil { // Requeue the failed node for update again. r.nodeCIDRUpdateChannel <- workItem } @@ -202,17 +212,20 @@ func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { r.nodesInProcessing.Delete(nodeName) } -func (r *rangeAllocator) occupyCIDR(node *v1.Node) error { +// marks node.PodCIDRs[...] as used in allocator's tracked cidrSet +func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error { defer r.removeNodeFromProcessing(node.Name) - if node.Spec.PodCIDR == "" { + if len(node.Spec.PodCIDRs) == 0 { return nil } - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) - } - if err := r.cidrs.Occupy(podCIDR); err != nil { - return fmt.Errorf("failed to mark cidr as occupied: %v", err) + for idx, cidr := range node.Spec.PodCIDRs { + _, podCIDR, err := net.ParseCIDR(cidr) + if err != nil { + return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + } + if err := r.cidrSets[idx].Occupy(podCIDR); err != nil { + return fmt.Errorf("failed to mark cidr[%v] at idx [%v] as occupied for node: %v: %v", podCIDR, idx, node.Name, err) + } } return nil } @@ -228,41 +241,53 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { klog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name) return nil } - if node.Spec.PodCIDR != "" { - return r.occupyCIDR(node) + + if len(node.Spec.PodCIDRs) > 0 { + return r.occupyCIDRs(node) } - podCIDR, err := r.cidrs.AllocateNext() - if err != nil { - r.removeNodeFromProcessing(node.Name) - nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") - return fmt.Errorf("failed to allocate cidr: %v", err) + // allocate and queue the assignment + allocated := nodeReservedCIDRs{ + nodeName: node.Name, + allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)), } - klog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) - r.nodeCIDRUpdateChannel <- nodeAndCIDR{ - nodeName: node.Name, - cidr: podCIDR, + for idx := range r.cidrSets { + podCIDR, err := r.cidrSets[idx].AllocateNext() + if err != nil { + r.removeNodeFromProcessing(node.Name) + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err) + } + allocated.allocatedCIDRs[idx] = podCIDR + } + + //queue the assignement + klog.V(4).Infof("Putting node %s with CIDR %v into the work queue", node.Name, allocated.allocatedCIDRs) + r.nodeCIDRUpdateChannel <- allocated + return nil +} + +// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets +func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error { + if node == nil || len(node.Spec.PodCIDRs) == 0 { + return nil + } + + for idx, cidr := range node.Spec.PodCIDRs { + _, podCIDR, err := net.ParseCIDR(cidr) + if err != nil { + return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", cidr, node.Name, err) + } + + klog.V(4).Infof("release CIDR %s for node:%v", cidr, node.Name) + if err = r.cidrSets[idx].Release(podCIDR); err != nil { + return fmt.Errorf("Error when releasing CIDR %v: %v", cidr, err) + } } return nil } -func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error { - if node == nil || node.Spec.PodCIDR == "" { - return nil - } - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err) - } - - klog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR) - if err = r.cidrs.Release(podCIDR); err != nil { - return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err) - } - return err -} - -// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, +// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used across all cidrs // so that they won't be assignable. func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { // Checks if service CIDR has a nonempty intersection with cluster @@ -270,57 +295,87 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { // clusterCIDR's Mask applied (this means that clusterCIDR contains // serviceCIDR) or vice versa (which means that serviceCIDR contains // clusterCIDR). - if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) { - return - } + for idx, cidr := range r.clusterCIDRs { + if !cidr.Contains(serviceCIDR.IP.Mask(cidr.Mask)) && !serviceCIDR.Contains(cidr.IP.Mask(serviceCIDR.Mask)) { + continue + } - if err := r.cidrs.Occupy(serviceCIDR); err != nil { - klog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err) + // at this point, len(cidrSet) == len(clusterCidr) + if err := r.cidrSets[idx].Occupy(serviceCIDR); err != nil { + klog.Errorf("Error filtering out service cidr out cluster cidr:%v (index:%v) %v: %v", cidr, idx, serviceCIDR, err) + } } } -// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server. -func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { +// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server. +func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error { var err error var node *v1.Node defer r.removeNodeFromProcessing(data.nodeName) - - podCIDR := data.cidr.String() - + cidrsString := cidrsAsString(data.allocatedCIDRs) node, err = r.nodeLister.Get(data.nodeName) if err != nil { - klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", data.nodeName, err) + klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDRs: %v", data.nodeName, err) return err } - if node.Spec.PodCIDR == podCIDR { - klog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR) - return nil - } - if node.Spec.PodCIDR != "" { - klog.Errorf("Node %v already has a CIDR allocated %v. Releasing the new one %v.", node.Name, node.Spec.PodCIDR, podCIDR) - if err := r.cidrs.Release(data.cidr); err != nil { - klog.Errorf("Error when releasing CIDR %v", podCIDR) + // if cidr list matches the proposed. + // then we possibly updated this node + // and just failed to ack the success. + if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) { + match := true + for idx, cidr := range cidrsString { + if node.Spec.PodCIDRs[idx] != cidr { + match = false + break + } } - return nil - } - // If we reached here, it means that the node has no CIDR currently assigned. So we set it. - for i := 0; i < cidrUpdateRetries; i++ { - if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil { - klog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR) + if match { + klog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, data.allocatedCIDRs) return nil } } - klog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, podCIDR, err) + + // node has cidrs, release the reserved + if len(node.Spec.PodCIDRs) != 0 { + klog.Errorf("Node %v already has a CIDR allocated %v. Releasing the new one.", node.Name, node.Spec.PodCIDRs) + for idx, cidr := range data.allocatedCIDRs { + if releaseErr := r.cidrSets[idx].Release(cidr); err != nil { + klog.Errorf("Error when releasing CIDR idx:%v value: %v err:%v", idx, cidr, releaseErr) + } + } + return nil + } + + // If we reached here, it means that the node has no CIDR currently assigned. So we set it. + for i := 0; i < cidrUpdateRetries; i++ { + if err = utilnode.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil { + klog.Infof("Set node %v PodCIDR to %v", node.Name, cidrsString) + return nil + } + } + // failed release back to the pool + klog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, cidrsString, err) nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") // We accept the fact that we may leak CIDRs here. This is safer than releasing // them in case when we don't know if request went through. // NodeController restart will return all falsely allocated CIDRs to the pool. if !apierrors.IsServerTimeout(err) { klog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", node.Name, err) - if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil { - klog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr) + for idx, cidr := range data.allocatedCIDRs { + if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil { + klog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr) + } } } return err } + +// converts a slice of cidrs into ,, +func cidrsAsString(inCIDRs []*net.IPNet) []string { + outCIDRs := make([]string, len(inCIDRs)) + for idx, inCIDR := range inCIDRs { + outCIDRs[idx] = inCIDR.String() + } + return outCIDRs +} diff --git a/pkg/controller/nodeipam/ipam/range_allocator_test.go b/pkg/controller/nodeipam/ipam/range_allocator_test.go index eadb608831f..8bcf9de6f7d 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator_test.go +++ b/pkg/controller/nodeipam/ipam/range_allocator_test.go @@ -59,16 +59,20 @@ func getFakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformer return fakeNodeInformer } +type testCase struct { + description string + fakeNodeHandler *testutil.FakeNodeHandler + clusterCIDRs []*net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + // key is index of the cidr allocated + expectedAllocatedCIDR map[int]string + allocatedCIDRs map[int][]string +} + func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { - testCases := []struct { - description string - fakeNodeHandler *testutil.FakeNodeHandler - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - subNetMaskSize int - expectedAllocatedCIDR string - allocatedCIDRs []string - }{ + // all tests operate on a single node + testCases := []testCase{ { description: "When there's no ServiceCIDR return first CIDR in range", fakeNodeHandler: &testutil.FakeNodeHandler{ @@ -81,13 +85,15 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { }, Clientset: fake.NewSimpleClientset(), }, - clusterCIDR: func() *net.IPNet { + clusterCIDRs: func() []*net.IPNet { _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") - return clusterCIDR + return []*net.IPNet{clusterCIDR} }(), - serviceCIDR: nil, - subNetMaskSize: 30, - expectedAllocatedCIDR: "127.123.234.0/30", + serviceCIDR: nil, + subNetMaskSize: 30, + expectedAllocatedCIDR: map[int]string{ + 0: "127.123.234.0/30", + }, }, { description: "Correctly filter out ServiceCIDR", @@ -101,17 +107,19 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { }, Clientset: fake.NewSimpleClientset(), }, - clusterCIDR: func() *net.IPNet { + clusterCIDRs: func() []*net.IPNet { _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") - return clusterCIDR + return []*net.IPNet{clusterCIDR} }(), serviceCIDR: func() *net.IPNet { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/26") - return clusterCIDR + _, serviceCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return serviceCIDR }(), subNetMaskSize: 30, // it should return first /30 CIDR after service range - expectedAllocatedCIDR: "127.123.234.64/30", + expectedAllocatedCIDR: map[int]string{ + 0: "127.123.234.64/30", + }, }, { description: "Correctly ignore already allocated CIDRs", @@ -125,31 +133,100 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { }, Clientset: fake.NewSimpleClientset(), }, - clusterCIDR: func() *net.IPNet { + clusterCIDRs: func() []*net.IPNet { _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") - return clusterCIDR + return []*net.IPNet{clusterCIDR} }(), serviceCIDR: func() *net.IPNet { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/26") - return clusterCIDR + _, serviceCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return serviceCIDR + }(), + subNetMaskSize: 30, + allocatedCIDRs: map[int][]string{ + 0: {"127.123.234.64/30", "127.123.234.68/30", "127.123.234.72/30", "127.123.234.80/30"}, + }, + expectedAllocatedCIDR: map[int]string{ + 0: "127.123.234.76/30", + }, + }, + { + description: "Dualstack CIDRs v4,v6", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDRs: func() []*net.IPNet { + _, clusterCIDRv4, _ := net.ParseCIDR("127.123.234.0/8") + _, clusterCIDRv6, _ := net.ParseCIDR("ace:cab:deca::/8") + return []*net.IPNet{clusterCIDRv4, clusterCIDRv6} + }(), + serviceCIDR: func() *net.IPNet { + _, serviceCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return serviceCIDR + }(), + }, + { + description: "Dualstack CIDRs v6,v4", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDRs: func() []*net.IPNet { + _, clusterCIDRv4, _ := net.ParseCIDR("127.123.234.0/8") + _, clusterCIDRv6, _ := net.ParseCIDR("ace:cab:deca::/8") + return []*net.IPNet{clusterCIDRv6, clusterCIDRv4} + }(), + serviceCIDR: func() *net.IPNet { + _, serviceCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return serviceCIDR + }(), + }, + + { + description: "Dualstack CIDRs, more than two", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDRs: func() []*net.IPNet { + _, clusterCIDRv4, _ := net.ParseCIDR("127.123.234.0/8") + _, clusterCIDRv6, _ := net.ParseCIDR("ace:cab:deca::/8") + _, clusterCIDRv4_2, _ := net.ParseCIDR("10.0.0.0/8") + return []*net.IPNet{clusterCIDRv4, clusterCIDRv6, clusterCIDRv4_2} + }(), + serviceCIDR: func() *net.IPNet { + _, serviceCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return serviceCIDR }(), - subNetMaskSize: 30, - allocatedCIDRs: []string{"127.123.234.64/30", "127.123.234.68/30", "127.123.234.72/30", "127.123.234.80/30"}, - expectedAllocatedCIDR: "127.123.234.76/30", }, } - testFunc := func(tc struct { - description string - fakeNodeHandler *testutil.FakeNodeHandler - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - subNetMaskSize int - expectedAllocatedCIDR string - allocatedCIDRs []string - }) { + // test function + testFunc := func(tc testCase) { // Initialize the range allocator. - allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) + allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDRs, tc.serviceCIDR, tc.subNetMaskSize, nil) + if err != nil { + t.Errorf("%v: failed to create CIDRRangeAllocator with error %v", tc.description, err) + return + } rangeAllocator, ok := allocator.(*rangeAllocator) if !ok { t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) @@ -160,50 +237,51 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { go allocator.Run(wait.NeverStop) // this is a bit of white box testing - for _, allocated := range tc.allocatedCIDRs { - _, cidr, err := net.ParseCIDR(allocated) - if err != nil { - t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + // pre allocate the cidrs as per the test + for idx, allocatedList := range tc.allocatedCIDRs { + for _, allocated := range allocatedList { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + if err = rangeAllocator.cidrSets[idx].Occupy(cidr); err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } - if err = rangeAllocator.cidrs.Occupy(cidr); err != nil { - t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { + t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) + } + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) } } - if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { - t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) + + if len(tc.expectedAllocatedCIDR) == 0 { + // nothing further expected + return } - if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { - t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) - } - found := false - seenCIDRs := []string{} for _, updatedNode := range tc.fakeNodeHandler.GetUpdatedNodesCopy() { - seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) - if updatedNode.Spec.PodCIDR == tc.expectedAllocatedCIDR { - found = true - break + if len(updatedNode.Spec.PodCIDRs) == 0 { + continue // not assigned yet + } + //match + for podCIDRIdx, expectedPodCIDR := range tc.expectedAllocatedCIDR { + if updatedNode.Spec.PodCIDRs[podCIDRIdx] != expectedPodCIDR { + t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", tc.description, expectedPodCIDR, updatedNode.Spec.PodCIDRs) + break + } } - } - if !found { - t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", - tc.description, tc.expectedAllocatedCIDR, seenCIDRs) } } + // run the test cases for _, tc := range testCases { testFunc(tc) } } func TestAllocateOrOccupyCIDRFailure(t *testing.T) { - testCases := []struct { - description string - fakeNodeHandler *testutil.FakeNodeHandler - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - subNetMaskSize int - allocatedCIDRs []string - }{ + testCases := []testCase{ { description: "When there's no ServiceCIDR return first CIDR in range", fakeNodeHandler: &testutil.FakeNodeHandler{ @@ -216,26 +294,24 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { }, Clientset: fake.NewSimpleClientset(), }, - clusterCIDR: func() *net.IPNet { + clusterCIDRs: func() []*net.IPNet { _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") - return clusterCIDR + return []*net.IPNet{clusterCIDR} }(), serviceCIDR: nil, subNetMaskSize: 30, - allocatedCIDRs: []string{"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, + allocatedCIDRs: map[int][]string{ + 0: {"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, + }, }, } - testFunc := func(tc struct { - description string - fakeNodeHandler *testutil.FakeNodeHandler - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - subNetMaskSize int - allocatedCIDRs []string - }) { + testFunc := func(tc testCase) { // Initialize the range allocator. - allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) + allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDRs, tc.serviceCIDR, tc.subNetMaskSize, nil) + if err != nil { + t.Logf("%v: failed to create CIDRRangeAllocator with error %v", tc.description, err) + } rangeAllocator, ok := allocator.(*rangeAllocator) if !ok { t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) @@ -246,14 +322,16 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { go allocator.Run(wait.NeverStop) // this is a bit of white box testing - for _, allocated := range tc.allocatedCIDRs { - _, cidr, err := net.ParseCIDR(allocated) - if err != nil { - t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) - } - err = rangeAllocator.cidrs.Occupy(cidr) - if err != nil { - t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + for setIdx, allocatedList := range tc.allocatedCIDRs { + for _, allocated := range allocatedList { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, cidr, err) + } + err = rangeAllocator.cidrSets[setIdx].Occupy(cidr) + if err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, cidr, err) + } } } if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err == nil { @@ -264,15 +342,21 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { if len(tc.fakeNodeHandler.GetUpdatedNodesCopy()) != 0 { t.Fatalf("%v: unexpected update of nodes: %v", tc.description, tc.fakeNodeHandler.GetUpdatedNodesCopy()) } - seenCIDRs := []string{} - for _, updatedNode := range tc.fakeNodeHandler.GetUpdatedNodesCopy() { - if updatedNode.Spec.PodCIDR != "" { - seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) - } + if len(tc.expectedAllocatedCIDR) == 0 { + // nothing further expected + return } - if len(seenCIDRs) != 0 { - t.Errorf("%v: Seen assigned CIDRs when not expected: %v", - tc.description, seenCIDRs) + for _, updatedNode := range tc.fakeNodeHandler.GetUpdatedNodesCopy() { + if len(updatedNode.Spec.PodCIDRs) == 0 { + continue // not assigned yet + } + //match + for podCIDRIdx, expectedPodCIDR := range tc.expectedAllocatedCIDR { + if updatedNode.Spec.PodCIDRs[podCIDRIdx] == expectedPodCIDR { + t.Errorf("%v: found cidr %v that should not be allocated on node with CIDRs:%v", tc.description, expectedPodCIDR, updatedNode.Spec.PodCIDRs) + break + } + } } } for _, tc := range testCases { @@ -280,18 +364,20 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { } } +type releaseTestCase struct { + description string + fakeNodeHandler *testutil.FakeNodeHandler + clusterCIDRs []*net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDRFirstRound map[int]string + expectedAllocatedCIDRSecondRound map[int]string + allocatedCIDRs map[int][]string + cidrsToRelease [][]string +} + func TestReleaseCIDRSuccess(t *testing.T) { - testCases := []struct { - description string - fakeNodeHandler *testutil.FakeNodeHandler - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - subNetMaskSize int - expectedAllocatedCIDRFirstRound string - expectedAllocatedCIDRSecondRound string - allocatedCIDRs []string - cidrsToRelease []string - }{ + testCases := []releaseTestCase{ { description: "Correctly release preallocated CIDR", fakeNodeHandler: &testutil.FakeNodeHandler{ @@ -304,16 +390,22 @@ func TestReleaseCIDRSuccess(t *testing.T) { }, Clientset: fake.NewSimpleClientset(), }, - clusterCIDR: func() *net.IPNet { + clusterCIDRs: func() []*net.IPNet { _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") - return clusterCIDR + return []*net.IPNet{clusterCIDR} }(), - serviceCIDR: nil, - subNetMaskSize: 30, - allocatedCIDRs: []string{"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, - expectedAllocatedCIDRFirstRound: "", - cidrsToRelease: []string{"127.123.234.4/30"}, - expectedAllocatedCIDRSecondRound: "127.123.234.4/30", + serviceCIDR: nil, + subNetMaskSize: 30, + allocatedCIDRs: map[int][]string{ + 0: {"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, + }, + expectedAllocatedCIDRFirstRound: nil, + cidrsToRelease: [][]string{ + {"127.123.234.4/30"}, + }, + expectedAllocatedCIDRSecondRound: map[int]string{ + 0: "127.123.234.4/30", + }, }, { description: "Correctly recycle CIDR", @@ -327,32 +419,30 @@ func TestReleaseCIDRSuccess(t *testing.T) { }, Clientset: fake.NewSimpleClientset(), }, - clusterCIDR: func() *net.IPNet { + clusterCIDRs: func() []*net.IPNet { _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") - return clusterCIDR + return []*net.IPNet{clusterCIDR} }(), - serviceCIDR: nil, - subNetMaskSize: 30, - allocatedCIDRs: []string{"127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, - expectedAllocatedCIDRFirstRound: "127.123.234.0/30", - cidrsToRelease: []string{"127.123.234.0/30"}, - expectedAllocatedCIDRSecondRound: "127.123.234.0/30", + serviceCIDR: nil, + subNetMaskSize: 30, + allocatedCIDRs: map[int][]string{ + 0: {"127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, + }, + expectedAllocatedCIDRFirstRound: map[int]string{ + 0: "127.123.234.0/30", + }, + cidrsToRelease: [][]string{ + {"127.123.234.0/30"}, + }, + expectedAllocatedCIDRSecondRound: map[int]string{ + 0: "127.123.234.0/30", + }, }, } - testFunc := func(tc struct { - description string - fakeNodeHandler *testutil.FakeNodeHandler - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - subNetMaskSize int - expectedAllocatedCIDRFirstRound string - expectedAllocatedCIDRSecondRound string - allocatedCIDRs []string - cidrsToRelease []string - }) { + testFunc := func(tc releaseTestCase) { // Initialize the range allocator. - allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDRs, tc.serviceCIDR, tc.subNetMaskSize, nil) rangeAllocator, ok := allocator.(*rangeAllocator) if !ok { t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) @@ -363,18 +453,21 @@ func TestReleaseCIDRSuccess(t *testing.T) { go allocator.Run(wait.NeverStop) // this is a bit of white box testing - for _, allocated := range tc.allocatedCIDRs { - _, cidr, err := net.ParseCIDR(allocated) - if err != nil { - t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) - } - err = rangeAllocator.cidrs.Occupy(cidr) - if err != nil { - t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + for setIdx, allocatedList := range tc.allocatedCIDRs { + for _, allocated := range allocatedList { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + err = rangeAllocator.cidrSets[setIdx].Occupy(cidr) + if err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } } + err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]) - if tc.expectedAllocatedCIDRFirstRound != "" { + if len(tc.expectedAllocatedCIDRFirstRound) != 0 { if err != nil { t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) } @@ -391,20 +484,18 @@ func TestReleaseCIDRSuccess(t *testing.T) { t.Fatalf("%v: unexpected update of nodes: %v", tc.description, tc.fakeNodeHandler.GetUpdatedNodesCopy()) } } - for _, cidrToRelease := range tc.cidrsToRelease { nodeToRelease := v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", }, } - nodeToRelease.Spec.PodCIDR = cidrToRelease + nodeToRelease.Spec.PodCIDRs = cidrToRelease err = allocator.ReleaseCIDR(&nodeToRelease) if err != nil { t.Fatalf("%v: unexpected error in ReleaseCIDR: %v", tc.description, err) } } - if err = allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) } @@ -412,20 +503,24 @@ func TestReleaseCIDRSuccess(t *testing.T) { t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) } - found := false - seenCIDRs := []string{} + if len(tc.expectedAllocatedCIDRSecondRound) == 0 { + // nothing further expected + return + } for _, updatedNode := range tc.fakeNodeHandler.GetUpdatedNodesCopy() { - seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) - if updatedNode.Spec.PodCIDR == tc.expectedAllocatedCIDRSecondRound { - found = true - break + if len(updatedNode.Spec.PodCIDRs) == 0 { + continue // not assigned yet + } + //match + for podCIDRIdx, expectedPodCIDR := range tc.expectedAllocatedCIDRSecondRound { + if updatedNode.Spec.PodCIDRs[podCIDRIdx] != expectedPodCIDR { + t.Errorf("%v: found cidr %v that should not be allocated on node with CIDRs:%v", tc.description, expectedPodCIDR, updatedNode.Spec.PodCIDRs) + break + } } } - if !found { - t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", - tc.description, tc.expectedAllocatedCIDRSecondRound, seenCIDRs) - } } + for _, tc := range testCases { testFunc(tc) } diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go index baaf1c34438..337e7ecfd54 100644 --- a/pkg/controller/nodeipam/node_ipam_controller.go +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -55,10 +55,10 @@ const ( type Controller struct { allocatorType ipam.CIDRAllocatorType - cloud cloudprovider.Interface - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - kubeClient clientset.Interface + cloud cloudprovider.Interface + clusterCIDRs []*net.IPNet + serviceCIDR *net.IPNet + kubeClient clientset.Interface // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) @@ -79,7 +79,7 @@ func NewNodeIpamController( nodeInformer coreinformers.NodeInformer, cloud cloudprovider.Interface, kubeClient clientset.Interface, - clusterCIDR *net.IPNet, + clusterCIDRs []*net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSize int, allocatorType ipam.CIDRAllocatorType) (*Controller, error) { @@ -101,13 +101,22 @@ func NewNodeIpamController( metrics.RegisterMetricAndTrackRateLimiterUsage("node_ipam_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } + // Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation. if allocatorType != ipam.CloudAllocatorType { - // Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation. - if clusterCIDR == nil { + if len(clusterCIDRs) == 0 { klog.Fatal("Controller: Must specify --cluster-cidr if --allocate-node-cidrs is set") } - if maskSize, _ := clusterCIDR.Mask.Size(); maskSize > nodeCIDRMaskSize { - klog.Fatal("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than --node-cidr-mask-size") + + // TODO: (khenidak) IPv6DualStack beta: + // - modify mask to allow flexible masks for IPv4 and IPv6 + // - for alpha status they are the same + + // for each cidr, node mask size must be < cidr mask + for _, cidr := range clusterCIDRs { + mask := cidr.Mask + if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize { + klog.Fatal("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than --node-cidr-mask-size") + } } } @@ -115,7 +124,7 @@ func NewNodeIpamController( cloud: cloud, kubeClient: kubeClient, lookupIP: net.LookupIP, - clusterCIDR: clusterCIDR, + clusterCIDRs: clusterCIDRs, serviceCIDR: serviceCIDR, allocatorType: allocatorType, } @@ -133,7 +142,16 @@ func NewNodeIpamController( case ipam.IPAMFromCloudAllocatorType: cfg.Mode = nodesync.SyncFromCloud } - ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) + + // we may end up here with no cidr at all in case of FromCloud/FromCluster + var cidr *net.IPNet + if len(clusterCIDRs) > 0 { + cidr = clusterCIDRs[0] + } + if len(clusterCIDRs) > 1 { + klog.Warningf("Multiple cidrs were configured with FromCluster or FromCloud. cidrs except first one were discarded") + } + ipamc, err := ipam.NewController(cfg, kubeClient, cloud, cidr, serviceCIDR, nodeCIDRMaskSize) if err != nil { klog.Fatalf("Error creating ipam controller: %v", err) } @@ -142,8 +160,7 @@ func NewNodeIpamController( } } else { var err error - ic.cidrAllocator, err = ipam.New( - kubeClient, cloud, nodeInformer, ic.allocatorType, ic.clusterCIDR, ic.serviceCIDR, nodeCIDRMaskSize) + ic.cidrAllocator, err = ipam.New(kubeClient, cloud, nodeInformer, ic.allocatorType, clusterCIDRs, ic.serviceCIDR, nodeCIDRMaskSize) if err != nil { return nil, err } diff --git a/pkg/controller/nodeipam/node_ipam_controller_test.go b/pkg/controller/nodeipam/node_ipam_controller_test.go index 50238804e1a..3a5d1b23f08 100644 --- a/pkg/controller/nodeipam/node_ipam_controller_test.go +++ b/pkg/controller/nodeipam/node_ipam_controller_test.go @@ -20,6 +20,7 @@ import ( "net" "os" "os/exec" + "strings" "testing" "k8s.io/api/core/v1" @@ -30,9 +31,10 @@ import ( "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/legacy-cloud-providers/gce" + netutils "k8s.io/utils/net" ) -func newTestNodeIpamController(clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int, allocatorType ipam.CIDRAllocatorType) (*Controller, error) { +func newTestNodeIpamController(clusterCIDR []*net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSize int, allocatorType ipam.CIDRAllocatorType) (*Controller, error) { clientSet := fake.NewSimpleClientset() fakeNodeHandler := &testutil.FakeNodeHandler{ Existing: []*v1.Node{ @@ -67,6 +69,7 @@ func TestNewNodeIpamControllerWithCIDRMasks(t *testing.T) { wantFatal bool }{ {"valid_range_allocator", "10.0.0.0/21", "10.1.0.0/21", 24, ipam.RangeAllocatorType, false}, + {"valid_range_allocator_dualstack", "10.0.0.0/21,2000::/10", "10.1.0.0/21", 24, ipam.RangeAllocatorType, false}, {"valid_cloud_allocator", "10.0.0.0/21", "10.1.0.0/21", 24, ipam.CloudAllocatorType, false}, {"valid_ipam_from_cluster", "10.0.0.0/21", "10.1.0.0/21", 24, ipam.IPAMFromClusterAllocatorType, false}, {"valid_ipam_from_cloud", "10.0.0.0/21", "10.1.0.0/21", 24, ipam.IPAMFromCloudAllocatorType, false}, @@ -76,11 +79,12 @@ func TestNewNodeIpamControllerWithCIDRMasks(t *testing.T) { {"invalid_CIDR_smaller_than_mask_other_allocators", "10.0.0.0/26", "10.1.0.0/21", 24, ipam.IPAMFromCloudAllocatorType, true}, } { t.Run(tc.desc, func(t *testing.T) { - _, clusterCIDRIpNet, _ := net.ParseCIDR(tc.clusterCIDR) + clusterCidrs, _ := netutils.ParseCIDRs(strings.Split(tc.clusterCIDR, ",")) _, serviceCIDRIpNet, _ := net.ParseCIDR(tc.serviceCIDR) + if os.Getenv("EXIT_ON_FATAL") == "1" { // This is the subprocess which runs the actual code. - newTestNodeIpamController(clusterCIDRIpNet, serviceCIDRIpNet, tc.maskSize, tc.allocatorType) + newTestNodeIpamController(clusterCidrs, serviceCIDRIpNet, tc.maskSize, tc.allocatorType) return } // This is the host process that monitors the exit code of the subprocess. @@ -96,7 +100,7 @@ func TestNewNodeIpamControllerWithCIDRMasks(t *testing.T) { gotFatal = !exitErr.Success() } if gotFatal != tc.wantFatal { - t.Errorf("newTestNodeIpamController(%v, %v, %v, %v) : gotFatal = %t ; wantFatal = %t", clusterCIDRIpNet, serviceCIDRIpNet, tc.maskSize, tc.allocatorType, gotFatal, tc.wantFatal) + t.Errorf("newTestNodeIpamController(%v, %v, %v, %v) : gotFatal = %t ; wantFatal = %t", clusterCidrs, serviceCIDRIpNet, tc.maskSize, tc.allocatorType, gotFatal, tc.wantFatal) } }) } diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index 087a0bc82dd..c9ae667d790 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -175,6 +175,27 @@ func PatchNodeCIDR(c clientset.Interface, node types.NodeName, cidr string) erro return nil } +// PatchNodeCIDR patches the specified node's CIDR to the given value. +func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error { + rawCidrs, err := json.Marshal(cidrs) + if err != nil { + return fmt.Errorf("failed to json.Marshal CIDRs: %v", err) + } + + rawCidr, err := json.Marshal(cidrs[0]) + if err != nil { + return fmt.Errorf("failed to json.Marshal CIDR: %v", err) + } + + // set the pod cidrs list and set the old pod cidr field + patchBytes := []byte(fmt.Sprintf(`{"spec":{"podCIDR":%s , "podCIDRs":%s}}`, rawCidr, rawCidrs)) + + if _, err := c.CoreV1().Nodes().Patch(string(node), types.StrategicMergePatchType, patchBytes); err != nil { + return fmt.Errorf("failed to patch node CIDR: %v", err) + } + return nil +} + // PatchNodeStatus patches node status. func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) { patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode) diff --git a/test/integration/ipamperf/ipam_test.go b/test/integration/ipamperf/ipam_test.go index afaaa75f812..09d69f44aa6 100644 --- a/test/integration/ipamperf/ipam_test.go +++ b/test/integration/ipamperf/ipam_test.go @@ -52,7 +52,7 @@ func setupAllocator(apiURL string, config *Config, clusterCIDR, serviceCIDR *net sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) ipamController, err := nodeipam.NewNodeIpamController( sharedInformer.Core().V1().Nodes(), config.Cloud, clientSet, - clusterCIDR, serviceCIDR, subnetMaskSize, config.AllocatorType, + []*net.IPNet{clusterCIDR}, serviceCIDR, subnetMaskSize, config.AllocatorType, ) if err != nil { return nil, shutdownFunc, err