Migrate CIDR allocators to shared node informer

This commit is contained in:
Shyam Jeedigunta
2017-11-22 19:38:26 +01:00
parent f85649c6cd
commit 263dd1227d
6 changed files with 169 additions and 89 deletions

View File

@@ -25,9 +25,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@@ -37,6 +38,7 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
@@ -49,6 +51,12 @@ type cloudCIDRAllocator struct {
client clientset.Interface
cloud *gce.GCECloud
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
// NewCloudCIDRAllocator.
nodeLister corelisters.NodeLister
// nodesSynced returns true if the node shared informer has been synced at least once.
nodesSynced cache.InformerSynced
// Channel that is used to pass updating Nodes to the background.
// This increases the throughput of CIDR assignment by parallelization
// and not blocking on long operations (which shouldn't be done from
@@ -64,7 +72,7 @@ type cloudCIDRAllocator struct {
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface) (CIDRAllocator, error) {
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
if client == nil {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
@@ -84,20 +92,45 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
ca := &cloudCIDRAllocator{
client: client,
cloud: gceCloud,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
}
for i := 0; i < cidrUpdateWorkers; i++ {
// TODO: Take stopChan as an argument to NewCloudCIDRAllocator and pass it to the worker.
go ca.worker(wait.NeverStop)
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(newNode)
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
})
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
return ca, nil
}
func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting cloud CIDR allocator")
defer glog.Infof("Shutting down cloud CIDR allocator")
if !controller.WaitForCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
return
}
for i := 0; i < cidrUpdateWorkers; i++ {
go ca.worker(stopCh)
}
<-stopCh
}
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
for {
select {
@@ -169,7 +202,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
for rep := 0; rep < cidrUpdateRetries; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = ca.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
node, err = ca.nodeLister.Get(nodeName)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", nodeName, err)
continue
@@ -218,16 +251,3 @@ func (ca *cloudCIDRAllocator) ReleaseCIDR(node *v1.Node) error {
node.Name, node.Spec.PodCIDR)
return nil
}
func (ca *cloudCIDRAllocator) Register(nodeInformer informers.NodeInformer) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(newNode)
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
})
}