mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Merge pull request #56240 from shyamjvs/improve-cidr-allocator
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Migrate CIDR allocators to shared node informer Ref https://github.com/kubernetes/kubernetes/issues/52292 /cc @wojtek-t @bowei
This commit is contained in:
commit
028c4c9399
@ -16,12 +16,15 @@ go_test(
|
|||||||
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam",
|
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam",
|
||||||
library = ":go_default_library",
|
library = ":go_default_library",
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/controller/node/ipam/cidrset:go_default_library",
|
"//pkg/controller/node/ipam/cidrset:go_default_library",
|
||||||
"//pkg/controller/node/ipam/test:go_default_library",
|
"//pkg/controller/node/ipam/test:go_default_library",
|
||||||
"//pkg/controller/testutil:go_default_library",
|
"//pkg/controller/testutil:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -41,6 +44,7 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/cloudprovider:go_default_library",
|
"//pkg/cloudprovider:go_default_library",
|
||||||
"//pkg/cloudprovider/providers/gce:go_default_library",
|
"//pkg/cloudprovider/providers/gce:go_default_library",
|
||||||
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/controller/node/ipam/cidrset:go_default_library",
|
"//pkg/controller/node/ipam/cidrset:go_default_library",
|
||||||
"//pkg/controller/node/ipam/sync:go_default_library",
|
"//pkg/controller/node/ipam/sync:go_default_library",
|
||||||
"//pkg/controller/node/util:go_default_library",
|
"//pkg/controller/node/util:go_default_library",
|
||||||
@ -52,12 +56,14 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||||
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme:go_default_library",
|
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme:go_default_library",
|
||||||
|
@ -80,12 +80,12 @@ type CIDRAllocator interface {
|
|||||||
AllocateOrOccupyCIDR(node *v1.Node) error
|
AllocateOrOccupyCIDR(node *v1.Node) error
|
||||||
// ReleaseCIDR releases the CIDR of the removed node
|
// ReleaseCIDR releases the CIDR of the removed node
|
||||||
ReleaseCIDR(node *v1.Node) error
|
ReleaseCIDR(node *v1.Node) error
|
||||||
// Register allocator with the nodeInformer for updates.
|
// Run starts all the working logic of the allocator.
|
||||||
Register(nodeInformer informers.NodeInformer)
|
Run(stopCh <-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new CIDR range allocator.
|
// New creates a new CIDR range allocator.
|
||||||
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) {
|
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) {
|
||||||
nodeList, err := listNodes(kubeClient)
|
nodeList, err := listNodes(kubeClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -93,9 +93,9 @@ func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, allocato
|
|||||||
|
|
||||||
switch allocatorType {
|
switch allocatorType {
|
||||||
case RangeAllocatorType:
|
case RangeAllocatorType:
|
||||||
return NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
|
return NewCIDRRangeAllocator(kubeClient, nodeInformer, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
|
||||||
case CloudAllocatorType:
|
case CloudAllocatorType:
|
||||||
return NewCloudCIDRAllocator(kubeClient, cloud)
|
return NewCloudCIDRAllocator(kubeClient, cloud, nodeInformer)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", allocatorType)
|
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", allocatorType)
|
||||||
}
|
}
|
||||||
|
@ -25,9 +25,10 @@ import (
|
|||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
informers "k8s.io/client-go/informers/core/v1"
|
informers "k8s.io/client-go/informers/core/v1"
|
||||||
|
corelisters "k8s.io/client-go/listers/core/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
|
||||||
@ -37,6 +38,7 @@ import (
|
|||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/node/util"
|
"k8s.io/kubernetes/pkg/controller/node/util"
|
||||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||||
)
|
)
|
||||||
@ -49,6 +51,12 @@ type cloudCIDRAllocator struct {
|
|||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
cloud *gce.GCECloud
|
cloud *gce.GCECloud
|
||||||
|
|
||||||
|
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
|
||||||
|
// NewCloudCIDRAllocator.
|
||||||
|
nodeLister corelisters.NodeLister
|
||||||
|
// nodesSynced returns true if the node shared informer has been synced at least once.
|
||||||
|
nodesSynced cache.InformerSynced
|
||||||
|
|
||||||
// Channel that is used to pass updating Nodes to the background.
|
// Channel that is used to pass updating Nodes to the background.
|
||||||
// This increases the throughput of CIDR assignment by parallelization
|
// This increases the throughput of CIDR assignment by parallelization
|
||||||
// and not blocking on long operations (which shouldn't be done from
|
// and not blocking on long operations (which shouldn't be done from
|
||||||
@ -64,7 +72,7 @@ type cloudCIDRAllocator struct {
|
|||||||
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
|
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
|
||||||
|
|
||||||
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
|
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
|
||||||
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface) (CIDRAllocator, error) {
|
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
glog.Fatalf("kubeClient is nil when starting NodeController")
|
glog.Fatalf("kubeClient is nil when starting NodeController")
|
||||||
}
|
}
|
||||||
@ -84,20 +92,45 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
|
|||||||
ca := &cloudCIDRAllocator{
|
ca := &cloudCIDRAllocator{
|
||||||
client: client,
|
client: client,
|
||||||
cloud: gceCloud,
|
cloud: gceCloud,
|
||||||
|
nodeLister: nodeInformer.Lister(),
|
||||||
|
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||||
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
|
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
nodesInProcessing: sets.NewString(),
|
nodesInProcessing: sets.NewString(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
// TODO: Take stopChan as an argument to NewCloudCIDRAllocator and pass it to the worker.
|
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
|
||||||
go ca.worker(wait.NeverStop)
|
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||||
}
|
if newNode.Spec.PodCIDR == "" {
|
||||||
|
return ca.AllocateOrOccupyCIDR(newNode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
|
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
|
||||||
|
})
|
||||||
|
|
||||||
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
|
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
|
||||||
return ca, nil
|
return ca, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
|
glog.Infof("Starting cloud CIDR allocator")
|
||||||
|
defer glog.Infof("Shutting down cloud CIDR allocator")
|
||||||
|
|
||||||
|
if !controller.WaitForCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||||
|
go ca.worker(stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
|
}
|
||||||
|
|
||||||
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
|
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -169,7 +202,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
|
|||||||
|
|
||||||
for rep := 0; rep < cidrUpdateRetries; rep++ {
|
for rep := 0; rep < cidrUpdateRetries; rep++ {
|
||||||
// TODO: change it to using PATCH instead of full Node updates.
|
// TODO: change it to using PATCH instead of full Node updates.
|
||||||
node, err = ca.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
|
node, err = ca.nodeLister.Get(nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", nodeName, err)
|
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", nodeName, err)
|
||||||
continue
|
continue
|
||||||
@ -218,16 +251,3 @@ func (ca *cloudCIDRAllocator) ReleaseCIDR(node *v1.Node) error {
|
|||||||
node.Name, node.Spec.PodCIDR)
|
node.Name, node.Spec.PodCIDR)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ca *cloudCIDRAllocator) Register(nodeInformer informers.NodeInformer) {
|
|
||||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
|
|
||||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
|
||||||
if newNode.Spec.PodCIDR == "" {
|
|
||||||
return ca.AllocateOrOccupyCIDR(newNode)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}),
|
|
||||||
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
@ -25,16 +25,16 @@ import (
|
|||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
informers "k8s.io/client-go/informers/core/v1"
|
informers "k8s.io/client-go/informers/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
|
corelisters "k8s.io/client-go/listers/core/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
|
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
|
||||||
"k8s.io/kubernetes/pkg/controller/node/util"
|
"k8s.io/kubernetes/pkg/controller/node/util"
|
||||||
)
|
)
|
||||||
@ -45,6 +45,12 @@ type rangeAllocator struct {
|
|||||||
clusterCIDR *net.IPNet
|
clusterCIDR *net.IPNet
|
||||||
maxCIDRs int
|
maxCIDRs int
|
||||||
|
|
||||||
|
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
|
||||||
|
// NewCloudCIDRAllocator.
|
||||||
|
nodeLister corelisters.NodeLister
|
||||||
|
// nodesSynced returns true if the node shared informer has been synced at least once.
|
||||||
|
nodesSynced cache.InformerSynced
|
||||||
|
|
||||||
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
|
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
|
||||||
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
||||||
nodeCIDRUpdateChannel chan nodeAndCIDR
|
nodeCIDRUpdateChannel chan nodeAndCIDR
|
||||||
@ -59,7 +65,7 @@ type rangeAllocator struct {
|
|||||||
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
|
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
|
||||||
// Caller must always pass in a list of existing nodes so the new allocator
|
// Caller must always pass in a list of existing nodes so the new allocator
|
||||||
// can initialize its CIDR map. NodeList is only nil in testing.
|
// can initialize its CIDR map. NodeList is only nil in testing.
|
||||||
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
|
func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
glog.Fatalf("kubeClient is nil when starting NodeController")
|
glog.Fatalf("kubeClient is nil when starting NodeController")
|
||||||
}
|
}
|
||||||
@ -78,6 +84,8 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
|||||||
client: client,
|
client: client,
|
||||||
cidrs: set,
|
cidrs: set,
|
||||||
clusterCIDR: clusterCIDR,
|
clusterCIDR: clusterCIDR,
|
||||||
|
nodeLister: nodeInformer.Lister(),
|
||||||
|
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||||
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
nodesInProcessing: sets.NewString(),
|
nodesInProcessing: sets.NewString(),
|
||||||
@ -107,14 +115,57 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
|
||||||
// TODO: Take stopChan as an argument to NewCIDRRangeAllocator and pass it to the worker.
|
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
go ra.worker(wait.NeverStop)
|
AddFunc: util.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
|
||||||
}
|
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||||
|
// If the PodCIDR is not empty we either:
|
||||||
|
// - already processed a Node that already had a CIDR after NC restarted
|
||||||
|
// (cidr is marked as used),
|
||||||
|
// - already processed a Node successfully and allocated a CIDR for it
|
||||||
|
// (cidr is marked as used),
|
||||||
|
// - already processed a Node but we did saw a "timeout" response and
|
||||||
|
// request eventually got through in this case we haven't released
|
||||||
|
// the allocated CIDR (cidr is still marked as used).
|
||||||
|
// There's a possible error here:
|
||||||
|
// - NC sees a new Node and assigns a CIDR X to it,
|
||||||
|
// - Update Node call fails with a timeout,
|
||||||
|
// - Node is updated by some other component, NC sees an update and
|
||||||
|
// assigns CIDR Y to the Node,
|
||||||
|
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||||
|
// even though Node sees only CIDR Y
|
||||||
|
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||||
|
// which prevents it from being assigned to any new node. The cluster
|
||||||
|
// state is correct.
|
||||||
|
// Restart of NC fixes the issue.
|
||||||
|
if newNode.Spec.PodCIDR == "" {
|
||||||
|
return ra.AllocateOrOccupyCIDR(newNode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
|
DeleteFunc: util.CreateDeleteNodeHandler(ra.ReleaseCIDR),
|
||||||
|
})
|
||||||
|
|
||||||
return ra, nil
|
return ra, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
|
glog.Infof("Starting range CIDR allocator")
|
||||||
|
defer glog.Infof("Shutting down range CIDR allocator")
|
||||||
|
|
||||||
|
if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||||
|
go r.worker(stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
|
}
|
||||||
|
|
||||||
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
|
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -232,7 +283,7 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
|||||||
podCIDR := data.cidr.String()
|
podCIDR := data.cidr.String()
|
||||||
for rep := 0; rep < cidrUpdateRetries; rep++ {
|
for rep := 0; rep < cidrUpdateRetries; rep++ {
|
||||||
// TODO: change it to using PATCH instead of full Node updates.
|
// TODO: change it to using PATCH instead of full Node updates.
|
||||||
node, err = r.client.CoreV1().Nodes().Get(data.nodeName, metav1.GetOptions{})
|
node, err = r.nodeLister.Get(data.nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
|
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
|
||||||
continue
|
continue
|
||||||
@ -269,35 +320,3 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rangeAllocator) Register(nodeInformer informers.NodeInformer) {
|
|
||||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: util.CreateAddNodeHandler(r.AllocateOrOccupyCIDR),
|
|
||||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
|
||||||
// If the PodCIDR is not empty we either:
|
|
||||||
// - already processed a Node that already had a CIDR after NC restarted
|
|
||||||
// (cidr is marked as used),
|
|
||||||
// - already processed a Node successfully and allocated a CIDR for it
|
|
||||||
// (cidr is marked as used),
|
|
||||||
// - already processed a Node but we did saw a "timeout" response and
|
|
||||||
// request eventually got through in this case we haven't released
|
|
||||||
// the allocated CIDR (cidr is still marked as used).
|
|
||||||
// There's a possible error here:
|
|
||||||
// - NC sees a new Node and assigns a CIDR X to it,
|
|
||||||
// - Update Node call fails with a timeout,
|
|
||||||
// - Node is updated by some other component, NC sees an update and
|
|
||||||
// assigns CIDR Y to the Node,
|
|
||||||
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
|
||||||
// even though Node sees only CIDR Y
|
|
||||||
// The problem here is that in in-memory cache we see CIDR X as marked,
|
|
||||||
// which prevents it from being assigned to any new node. The cluster
|
|
||||||
// state is correct.
|
|
||||||
// Restart of NC fixes the issue.
|
|
||||||
if newNode.Spec.PodCIDR == "" {
|
|
||||||
return r.AllocateOrOccupyCIDR(newNode)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}),
|
|
||||||
DeleteFunc: util.CreateDeleteNodeHandler(r.ReleaseCIDR),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
@ -24,7 +24,10 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/testutil"
|
"k8s.io/kubernetes/pkg/controller/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -32,6 +35,8 @@ const (
|
|||||||
nodePollInterval = 100 * time.Millisecond
|
nodePollInterval = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var alwaysReady = func() bool { return true }
|
||||||
|
|
||||||
func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
|
func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
|
||||||
return wait.Poll(nodePollInterval, timeout, func() (bool, error) {
|
return wait.Poll(nodePollInterval, timeout, func() (bool, error) {
|
||||||
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
|
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
|
||||||
@ -41,6 +46,19 @@ func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Creates a fakeNodeInformer using the provided fakeNodeHandler.
|
||||||
|
func getFakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformers.NodeInformer {
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
|
||||||
|
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
|
||||||
|
|
||||||
|
for _, node := range fakeNodeHandler.Existing {
|
||||||
|
fakeNodeInformer.Informer().GetStore().Add(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fakeNodeInformer
|
||||||
|
}
|
||||||
|
|
||||||
func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
|
func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
description string
|
description string
|
||||||
@ -130,19 +148,23 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
|
|||||||
expectedAllocatedCIDR string
|
expectedAllocatedCIDR string
|
||||||
allocatedCIDRs []string
|
allocatedCIDRs []string
|
||||||
}) {
|
}) {
|
||||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
// Initialize the range allocator.
|
||||||
|
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||||
|
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||||
|
if !ok {
|
||||||
|
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rangeAllocator.nodesSynced = alwaysReady
|
||||||
|
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||||
|
go allocator.Run(wait.NeverStop)
|
||||||
|
|
||||||
// this is a bit of white box testing
|
// this is a bit of white box testing
|
||||||
for _, allocated := range tc.allocatedCIDRs {
|
for _, allocated := range tc.allocatedCIDRs {
|
||||||
_, cidr, err := net.ParseCIDR(allocated)
|
_, cidr, err := net.ParseCIDR(allocated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
||||||
}
|
}
|
||||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
|
||||||
if !ok {
|
|
||||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
|
||||||
if err = rangeAllocator.cidrs.Occupy(cidr); err != nil {
|
if err = rangeAllocator.cidrs.Occupy(cidr); err != nil {
|
||||||
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
||||||
}
|
}
|
||||||
@ -212,19 +234,23 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
|
|||||||
subNetMaskSize int
|
subNetMaskSize int
|
||||||
allocatedCIDRs []string
|
allocatedCIDRs []string
|
||||||
}) {
|
}) {
|
||||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
// Initialize the range allocator.
|
||||||
|
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||||
|
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||||
|
if !ok {
|
||||||
|
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rangeAllocator.nodesSynced = alwaysReady
|
||||||
|
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||||
|
go allocator.Run(wait.NeverStop)
|
||||||
|
|
||||||
// this is a bit of white box testing
|
// this is a bit of white box testing
|
||||||
for _, allocated := range tc.allocatedCIDRs {
|
for _, allocated := range tc.allocatedCIDRs {
|
||||||
_, cidr, err := net.ParseCIDR(allocated)
|
_, cidr, err := net.ParseCIDR(allocated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
||||||
}
|
}
|
||||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
|
||||||
if !ok {
|
|
||||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
|
||||||
err = rangeAllocator.cidrs.Occupy(cidr)
|
err = rangeAllocator.cidrs.Occupy(cidr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
||||||
@ -324,19 +350,23 @@ func TestReleaseCIDRSuccess(t *testing.T) {
|
|||||||
allocatedCIDRs []string
|
allocatedCIDRs []string
|
||||||
cidrsToRelease []string
|
cidrsToRelease []string
|
||||||
}) {
|
}) {
|
||||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
// Initialize the range allocator.
|
||||||
|
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||||
|
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||||
|
if !ok {
|
||||||
|
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rangeAllocator.nodesSynced = alwaysReady
|
||||||
|
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||||
|
go allocator.Run(wait.NeverStop)
|
||||||
|
|
||||||
// this is a bit of white box testing
|
// this is a bit of white box testing
|
||||||
for _, allocated := range tc.allocatedCIDRs {
|
for _, allocated := range tc.allocatedCIDRs {
|
||||||
_, cidr, err := net.ParseCIDR(allocated)
|
_, cidr, err := net.ParseCIDR(allocated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
||||||
}
|
}
|
||||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
|
||||||
if !ok {
|
|
||||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
|
||||||
err = rangeAllocator.cidrs.Occupy(cidr)
|
err = rangeAllocator.cidrs.Occupy(cidr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
||||||
|
@ -360,11 +360,10 @@ func NewNodeController(
|
|||||||
} else {
|
} else {
|
||||||
var err error
|
var err error
|
||||||
nc.cidrAllocator, err = ipam.New(
|
nc.cidrAllocator, err = ipam.New(
|
||||||
kubeClient, cloud, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
|
kubeClient, cloud, nodeInformer, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nc.cidrAllocator.Register(nodeInformer)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -585,6 +584,12 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
|
|||||||
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
|
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if nc.allocateNodeCIDRs {
|
||||||
|
if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType {
|
||||||
|
go nc.cidrAllocator.Run(wait.NeverStop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user