Introduce MultiCIDRRangeAllocator

MultiCIDRRangeAllocator is a new Range Allocator which makes using
multiple ClusterCIDRs possible. It consists of two controllers, one for
reconciling the ClusterCIDR API objects and the other for allocating
Pod CIDRs to the nodes.

The allocation is based on the rules defined in
https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2593-multiple-cluster-cidrs
This commit is contained in:
Sarvesh Rangnekar 2022-08-05 19:24:18 +00:00
parent 02d944d046
commit 5b801ba9f9
13 changed files with 3198 additions and 77 deletions

View File

@ -39,4 +39,5 @@ rules:
- k8s.io/kubernetes/pkg/util/taints - k8s.io/kubernetes/pkg/util/taints
- k8s.io/kubernetes/pkg/proxy/util - k8s.io/kubernetes/pkg/proxy/util
- k8s.io/kubernetes/pkg/proxy/util/testing - k8s.io/kubernetes/pkg/proxy/util/testing
- k8s.io/kubernetes/pkg/util/slice
- k8s.io/kubernetes/pkg/util/sysctl - k8s.io/kubernetes/pkg/util/sysctl

View File

@ -26,6 +26,8 @@ import (
"net" "net"
"strings" "strings"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers/networking/v1alpha1"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app" "k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config" cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
@ -36,6 +38,7 @@ import (
nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam" nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam"
nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config" nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
"k8s.io/kubernetes/pkg/features"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
@ -120,8 +123,14 @@ func startNodeIpamController(initContext app.ControllerInitContext, ccmConfig *c
return nil, false, err return nil, false, err
} }
var clusterCIDRInformer v1alpha1.ClusterCIDRInformer
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
clusterCIDRInformer = ctx.InformerFactory.Networking().V1alpha1().ClusterCIDRs()
}
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().Nodes(),
clusterCIDRInformer,
cloud, cloud,
ctx.ClientBuilder.ClientOrDie(initContext.ClientName), ctx.ClientBuilder.ClientOrDie(initContext.ClientName),
clusterCIDRs, clusterCIDRs,

View File

@ -27,7 +27,9 @@ import (
"strings" "strings"
"time" "time"
"k8s.io/client-go/informers/networking/v1alpha1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -153,8 +155,14 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo
return nil, false, err return nil, false, err
} }
var clusterCIDRInformer v1alpha1.ClusterCIDRInformer
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
clusterCIDRInformer = controllerContext.InformerFactory.Networking().V1alpha1().ClusterCIDRs()
}
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Core().V1().Nodes(),
clusterCIDRInformer,
controllerContext.Cloud, controllerContext.Cloud,
controllerContext.ClientBuilder.ClientOrDie("node-controller"), controllerContext.ClientBuilder.ClientOrDie("node-controller"),
clusterCIDRs, clusterCIDRs,

View File

@ -22,16 +22,18 @@ import (
"net" "net"
"time" "time"
"k8s.io/klog/v2"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
informers "k8s.io/client-go/informers/core/v1" informers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
) )
// CIDRAllocatorType is the type of the allocator to use. // CIDRAllocatorType is the type of the allocator to use.
@ -41,6 +43,9 @@ const (
// RangeAllocatorType is the allocator that uses an internal CIDR // RangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations. // range allocator to do node CIDR range allocations.
RangeAllocatorType CIDRAllocatorType = "RangeAllocator" RangeAllocatorType CIDRAllocatorType = "RangeAllocator"
// MultiCIDRRangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations.
MultiCIDRRangeAllocatorType CIDRAllocatorType = "MultiCIDRRangeAllocator"
// CloudAllocatorType is the allocator that uses cloud platform // CloudAllocatorType is the allocator that uses cloud platform
// support to do node CIDR range allocations. // support to do node CIDR range allocations.
CloudAllocatorType CIDRAllocatorType = "CloudAllocator" CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
@ -87,7 +92,7 @@ type CIDRAllocator interface {
// CIDR if it doesn't currently have one or mark the CIDR as used if // CIDR if it doesn't currently have one or mark the CIDR as used if
// the node already have one. // the node already have one.
AllocateOrOccupyCIDR(node *v1.Node) error AllocateOrOccupyCIDR(node *v1.Node) error
// ReleaseCIDR releases the CIDR of the removed node // ReleaseCIDR releases the CIDR of the removed node.
ReleaseCIDR(node *v1.Node) error ReleaseCIDR(node *v1.Node) error
// Run starts all the working logic of the allocator. // Run starts all the working logic of the allocator.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
@ -96,18 +101,25 @@ type CIDRAllocator interface {
// CIDRAllocatorParams is parameters that's required for creating new // CIDRAllocatorParams is parameters that's required for creating new
// cidr range allocator. // cidr range allocator.
type CIDRAllocatorParams struct { type CIDRAllocatorParams struct {
// ClusterCIDRs is list of cluster cidrs // ClusterCIDRs is list of cluster cidrs.
ClusterCIDRs []*net.IPNet ClusterCIDRs []*net.IPNet
// ServiceCIDR is primary service cidr for cluster // ServiceCIDR is primary service cidr for cluster.
ServiceCIDR *net.IPNet ServiceCIDR *net.IPNet
// SecondaryServiceCIDR is secondary service cidr for cluster // SecondaryServiceCIDR is secondary service cidr for cluster.
SecondaryServiceCIDR *net.IPNet SecondaryServiceCIDR *net.IPNet
// NodeCIDRMaskSizes is list of node cidr mask sizes // NodeCIDRMaskSizes is list of node cidr mask sizes.
NodeCIDRMaskSizes []int NodeCIDRMaskSizes []int
} }
// CIDRs are reserved, then node resource is patched with them.
// nodeReservedCIDRs holds the reservation info for a node.
type nodeReservedCIDRs struct {
allocatedCIDRs []*net.IPNet
nodeName string
}
// New creates a new CIDR range allocator. // New creates a new CIDR range allocator.
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) { func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, clusterCIDRInformer networkinginformers.ClusterCIDRInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
nodeList, err := listNodes(kubeClient) nodeList, err := listNodes(kubeClient)
if err != nil { if err != nil {
return nil, err return nil, err
@ -116,6 +128,12 @@ func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInfo
switch allocatorType { switch allocatorType {
case RangeAllocatorType: case RangeAllocatorType:
return NewCIDRRangeAllocator(kubeClient, nodeInformer, allocatorParams, nodeList) return NewCIDRRangeAllocator(kubeClient, nodeInformer, allocatorParams, nodeList)
case MultiCIDRRangeAllocatorType:
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
return nil, fmt.Errorf("invalid CIDR allocator type: %v, feature gate %v must be enabled", allocatorType, features.MultiCIDRRangeAllocator)
}
return NewMultiCIDRRangeAllocator(kubeClient, nodeInformer, clusterCIDRInformer, allocatorParams, nodeList, nil)
case CloudAllocatorType: case CloudAllocatorType:
return NewCloudCIDRAllocator(kubeClient, cloud, nodeInformer) return NewCloudCIDRAllocator(kubeClient, cloud, nodeInformer)
default: default:
@ -144,3 +162,12 @@ func listNodes(kubeClient clientset.Interface) (*v1.NodeList, error) {
} }
return nodeList, nil return nodeList, nil
} }
// ipnetToStringList converts a slice of net.IPNet into a list of CIDR in string format
func ipnetToStringList(inCIDRs []*net.IPNet) []string {
outCIDRs := make([]string, len(inCIDRs))
for idx, inCIDR := range inCIDRs {
outCIDRs[idx] = inCIDR.String()
}
return outCIDRs
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -41,13 +41,6 @@ import (
controllerutil "k8s.io/kubernetes/pkg/controller/util/node" controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
) )
// cidrs are reserved, then node resource is patched with them
// this type holds the reservation info for a node
type nodeReservedCIDRs struct {
allocatedCIDRs []*net.IPNet
nodeName string
}
type rangeAllocator struct { type rangeAllocator struct {
client clientset.Interface client clientset.Interface
// cluster cidrs as passed in during controller creation // cluster cidrs as passed in during controller creation
@ -333,7 +326,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
var err error var err error
var node *v1.Node var node *v1.Node
defer r.removeNodeFromProcessing(data.nodeName) defer r.removeNodeFromProcessing(data.nodeName)
cidrsString := cidrsAsString(data.allocatedCIDRs) cidrsString := ipnetToStringList(data.allocatedCIDRs)
node, err = r.nodeLister.Get(data.nodeName) node, err = r.nodeLister.Get(data.nodeName)
if err != nil { if err != nil {
klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDRs: %v", data.nodeName, err) klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDRs: %v", data.nodeName, err)
@ -391,12 +384,3 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
} }
return err return err
} }
// converts a slice of cidrs into <c-1>,<c-2>,<c-n>
func cidrsAsString(inCIDRs []*net.IPNet) []string {
outCIDRs := make([]string, len(inCIDRs))
for idx, inCIDR := range inCIDRs {
outCIDRs[idx] = inCIDR.String()
}
return outCIDRs
}

View File

@ -25,40 +25,12 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
const testNodePollInterval = 10 * time.Millisecond
var alwaysReady = func() bool { return true }
func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
return wait.Poll(nodePollInterval, timeout, func() (bool, error) {
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
return true, nil
}
return false, nil
})
}
// Creates a fakeNodeInformer using the provided fakeNodeHandler.
func getFakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformers.NodeInformer {
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node)
}
return fakeNodeInformer
}
type testCase struct { type testCase struct {
description string description string
fakeNodeHandler *testutil.FakeNodeHandler fakeNodeHandler *testutil.FakeNodeHandler
@ -305,7 +277,7 @@ func TestOccupyPreExistingCIDR(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) { t.Run(tc.description, func(t *testing.T) {
// Initialize the range allocator. // Initialize the range allocator.
fakeNodeInformer := getFakeNodeInformer(tc.fakeNodeHandler) fakeNodeInformer := test.FakeNodeInformer(tc.fakeNodeHandler)
nodeList, _ := tc.fakeNodeHandler.List(context.TODO(), metav1.ListOptions{}) nodeList, _ := tc.fakeNodeHandler.List(context.TODO(), metav1.ListOptions{})
_, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, fakeNodeInformer, tc.allocatorParams, nodeList) _, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, fakeNodeInformer, tc.allocatorParams, nodeList)
if err == nil && tc.ctrlCreateFail { if err == nil && tc.ctrlCreateFail {
@ -321,7 +293,7 @@ func TestOccupyPreExistingCIDR(t *testing.T) {
func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
// Non-parallel test (overrides global var) // Non-parallel test (overrides global var)
oldNodePollInterval := nodePollInterval oldNodePollInterval := nodePollInterval
nodePollInterval = testNodePollInterval nodePollInterval = test.NodePollInterval
defer func() { defer func() {
nodePollInterval = oldNodePollInterval nodePollInterval = oldNodePollInterval
}() }()
@ -537,7 +509,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
// test function // test function
testFunc := func(tc testCase) { testFunc := func(tc testCase) {
fakeNodeInformer := getFakeNodeInformer(tc.fakeNodeHandler) fakeNodeInformer := test.FakeNodeInformer(tc.fakeNodeHandler)
nodeList, _ := tc.fakeNodeHandler.List(context.TODO(), metav1.ListOptions{}) nodeList, _ := tc.fakeNodeHandler.List(context.TODO(), metav1.ListOptions{})
// Initialize the range allocator. // Initialize the range allocator.
allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, fakeNodeInformer, tc.allocatorParams, nodeList) allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, fakeNodeInformer, tc.allocatorParams, nodeList)
@ -550,7 +522,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return return
} }
rangeAllocator.nodesSynced = alwaysReady rangeAllocator.nodesSynced = test.AlwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder() rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop) go allocator.Run(wait.NeverStop)
@ -580,7 +552,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
if updateCount != 1 { if updateCount != 1 {
t.Fatalf("test error: all tests must update exactly one node") t.Fatalf("test error: all tests must update exactly one node")
} }
if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, updateCount, wait.ForeverTestTimeout); err != nil { if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, updateCount, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
} }
@ -639,7 +611,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
testFunc := func(tc testCase) { testFunc := func(tc testCase) {
// Initialize the range allocator. // Initialize the range allocator.
allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil) allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, test.FakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
if err != nil { if err != nil {
t.Logf("%v: failed to create CIDRRangeAllocator with error %v", tc.description, err) t.Logf("%v: failed to create CIDRRangeAllocator with error %v", tc.description, err)
} }
@ -648,7 +620,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return return
} }
rangeAllocator.nodesSynced = alwaysReady rangeAllocator.nodesSynced = test.AlwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder() rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop) go allocator.Run(wait.NeverStop)
@ -708,7 +680,7 @@ type releaseTestCase struct {
func TestReleaseCIDRSuccess(t *testing.T) { func TestReleaseCIDRSuccess(t *testing.T) {
// Non-parallel test (overrides global var) // Non-parallel test (overrides global var)
oldNodePollInterval := nodePollInterval oldNodePollInterval := nodePollInterval
nodePollInterval = testNodePollInterval nodePollInterval = test.NodePollInterval
defer func() { defer func() {
nodePollInterval = oldNodePollInterval nodePollInterval = oldNodePollInterval
}() }()
@ -784,13 +756,13 @@ func TestReleaseCIDRSuccess(t *testing.T) {
testFunc := func(tc releaseTestCase) { testFunc := func(tc releaseTestCase) {
// Initialize the range allocator. // Initialize the range allocator.
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil) allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, test.FakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
rangeAllocator, ok := allocator.(*rangeAllocator) rangeAllocator, ok := allocator.(*rangeAllocator)
if !ok { if !ok {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return return
} }
rangeAllocator.nodesSynced = alwaysReady rangeAllocator.nodesSynced = test.AlwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder() rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop) go allocator.Run(wait.NeverStop)
@ -813,7 +785,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
} }
if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
} }
} else { } else {
@ -841,7 +813,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
if err = allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { if err = allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil {
t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
} }
if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
} }

View File

@ -18,10 +18,21 @@ package test
import ( import (
"net" "net"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/testutil"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
const NodePollInterval = 10 * time.Millisecond
var AlwaysReady = func() bool { return true }
// MustParseCIDR returns the CIDR range parsed from s or panics if the string // MustParseCIDR returns the CIDR range parsed from s or panics if the string
// cannot be parsed. // cannot be parsed.
func MustParseCIDR(s string) *net.IPNet { func MustParseCIDR(s string) *net.IPNet {
@ -31,3 +42,25 @@ func MustParseCIDR(s string) *net.IPNet {
} }
return ret return ret
} }
// FakeNodeInformer creates a fakeNodeInformer using the provided fakeNodeHandler.
func FakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformers.NodeInformer {
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node)
}
return fakeNodeInformer
}
func WaitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
return wait.Poll(NodePollInterval, timeout, func() (bool, error) {
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
return true, nil
}
return false, nil
})
}

View File

@ -20,20 +20,18 @@ import (
"net" "net"
"time" "time"
"k8s.io/klog/v2"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers" controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
) )
@ -74,6 +72,7 @@ type Controller struct {
// currently, this should be handled as a fatal error. // currently, this should be handled as a fatal error.
func NewNodeIpamController( func NewNodeIpamController(
nodeInformer coreinformers.NodeInformer, nodeInformer coreinformers.NodeInformer,
clusterCIDRInformer networkinginformers.ClusterCIDRInformer,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
kubeClient clientset.Interface, kubeClient clientset.Interface,
clusterCIDRs []*net.IPNet, clusterCIDRs []*net.IPNet,
@ -136,7 +135,7 @@ func NewNodeIpamController(
NodeCIDRMaskSizes: nodeCIDRMaskSizes, NodeCIDRMaskSizes: nodeCIDRMaskSizes,
} }
ic.cidrAllocator, err = ipam.New(kubeClient, cloud, nodeInformer, ic.allocatorType, allocatorParams) ic.cidrAllocator, err = ipam.New(kubeClient, cloud, nodeInformer, clusterCIDRInformer, ic.allocatorType, allocatorParams)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -48,6 +48,7 @@ func newTestNodeIpamController(clusterCIDR []*net.IPNet, serviceCIDR *net.IPNet,
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc()) fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes() fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
fakeClusterCIDRInformer := fakeInformerFactory.Networking().V1alpha1().ClusterCIDRs()
for _, node := range fakeNodeHandler.Existing { for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node) fakeNodeInformer.Informer().GetStore().Add(node)
@ -55,7 +56,7 @@ func newTestNodeIpamController(clusterCIDR []*net.IPNet, serviceCIDR *net.IPNet,
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
return NewNodeIpamController( return NewNodeIpamController(
fakeNodeInformer, fakeGCE, clientSet, fakeNodeInformer, fakeClusterCIDRInformer, fakeGCE, clientSet,
clusterCIDR, serviceCIDR, secondaryServiceCIDR, nodeCIDRMaskSizes, allocatorType, clusterCIDR, serviceCIDR, secondaryServiceCIDR, nodeCIDRMaskSizes, allocatorType,
) )
} }
@ -78,6 +79,9 @@ func TestNewNodeIpamControllerWithCIDRMasks(t *testing.T) {
{"valid_range_allocator_dualstack", "10.0.0.0/21,2000::/10", "10.1.0.0/21", emptyServiceCIDR, []int{24, 98}, ipam.RangeAllocatorType, false}, {"valid_range_allocator_dualstack", "10.0.0.0/21,2000::/10", "10.1.0.0/21", emptyServiceCIDR, []int{24, 98}, ipam.RangeAllocatorType, false},
{"valid_range_allocator_dualstack_dualstackservice", "10.0.0.0/21,2000::/10", "10.1.0.0/21", "3000::/10", []int{24, 98}, ipam.RangeAllocatorType, false}, {"valid_range_allocator_dualstack_dualstackservice", "10.0.0.0/21,2000::/10", "10.1.0.0/21", "3000::/10", []int{24, 98}, ipam.RangeAllocatorType, false},
{"valid_multi_cidr_range_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.MultiCIDRRangeAllocatorType, false},
{"valid_multi_cidr_range_allocator_dualstack", "10.0.0.0/21,2000::/10", "10.1.0.0/21", emptyServiceCIDR, []int{24, 98}, ipam.MultiCIDRRangeAllocatorType, false},
{"valid_cloud_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.CloudAllocatorType, false}, {"valid_cloud_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.CloudAllocatorType, false},
{"valid_ipam_from_cluster", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromClusterAllocatorType, false}, {"valid_ipam_from_cluster", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromClusterAllocatorType, false},
{"valid_ipam_from_cloud", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromCloudAllocatorType, false}, {"valid_ipam_from_cloud", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromCloudAllocatorType, false},

View File

@ -569,6 +569,13 @@ const (
// Enables the usage of different protocols in the same Service with type=LoadBalancer // Enables the usage of different protocols in the same Service with type=LoadBalancer
MixedProtocolLBService featuregate.Feature = "MixedProtocolLBService" MixedProtocolLBService featuregate.Feature = "MixedProtocolLBService"
// owner: @sarveshr7
// kep: http://kep.k8s.io/2593
// alpha: v1.25
//
// Enables the MultiCIDR Range allocator.
MultiCIDRRangeAllocator featuregate.Feature = "MultiCIDRRangeAllocator"
// owner: @rikatz // owner: @rikatz
// kep: http://kep.k8s.io/2079 // kep: http://kep.k8s.io/2079
// alpha: v1.21 // alpha: v1.21
@ -997,6 +1004,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta}, MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta},
MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha},
NetworkPolicyEndPort: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27 NetworkPolicyEndPort: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27
NetworkPolicyStatus: {Default: false, PreRelease: featuregate.Alpha}, NetworkPolicyStatus: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -50,8 +50,10 @@ func setupAllocator(kubeConfig *restclient.Config, config *Config, clusterCIDR,
sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
ipamController, err := nodeipam.NewNodeIpamController( ipamController, err := nodeipam.NewNodeIpamController(
sharedInformer.Core().V1().Nodes(), config.Cloud, clientSet, sharedInformer.Core().V1().Nodes(),
[]*net.IPNet{clusterCIDR}, serviceCIDR, nil, []int{subnetMaskSize}, config.AllocatorType, sharedInformer.Networking().V1alpha1().ClusterCIDRs(),
config.Cloud, clientSet, []*net.IPNet{clusterCIDR}, serviceCIDR, nil,
[]int{subnetMaskSize}, config.AllocatorType,
) )
if err != nil { if err != nil {
return nil, shutdownFunc, err return nil, shutdownFunc, err