Merge pull request #52285 from shyamjvs/cidr-allocation-swag

Automatic merge from submit-queue (batch tested with PRs 52679, 52285). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>..

Improve cloud-cidr-allocator's performance

Fixes https://github.com/kubernetes/kubernetes/issues/52284

This makes the changes I suggested on that issue. Also it makes the cloud cidr allocator more similar to range allocator.

cc @kubernetes/sig-network-pr-reviews @kubernetes/sig-scalability-misc @wojtek-t @bowei
This commit is contained in:
Kubernetes Submit Queue 2017-09-19 18:27:47 -07:00 committed by GitHub
commit dc37cb005d
4 changed files with 179 additions and 93 deletions

View File

@ -37,7 +37,6 @@ go_library(
"timeout.go",
],
deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library",

View File

@ -54,9 +54,21 @@ const (
// IPAMFromCloudAllocatorType uses the ipam controller sync'ing the node
// CIDR range allocations from the cloud to the cluster.
IPAMFromCloudAllocatorType = "IPAMFromCloud"
)
// TODO: figure out the good setting for those constants.
const (
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
// The no. of NodeSpec updates NC can process concurrently.
cidrUpdateWorkers = 30
// The max no. of NodeSpec updates that can be enqueued.
cidrUpdateQueueSize = 5000
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
cidrUpdateRetries = 10
)
// CIDRAllocator is an interface implemented by things that know how

View File

@ -18,19 +18,23 @@ package ipam
import (
"fmt"
"net"
"sync"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller/node/util"
@ -42,84 +46,158 @@ import (
// deallocation is delegated to the external provider, and the controller
// merely takes the assignment and updates the node spec.
type cloudCIDRAllocator struct {
lock sync.Mutex
client clientset.Interface
cloud *gce.GCECloud
recorder record.EventRecorder
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
lock sync.Mutex
nodesInProcessing sets.String
}
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator(
client clientset.Interface,
cloud cloudprovider.Interface) (ca CIDRAllocator, err error) {
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface) (CIDRAllocator, error) {
if client == nil {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof)
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
gceCloud, ok := cloud.(*gce.GCECloud)
if !ok {
err = fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
return
err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
return nil, err
}
ca = &cloudCIDRAllocator{
ca := &cloudCIDRAllocator{
client: client,
cloud: gceCloud,
recorder: record.NewBroadcaster().NewRecorder(
api.Scheme,
v1.EventSource{Component: "cidrAllocator"}),
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
}
for i := 0; i < cidrUpdateWorkers; i++ {
// TODO: Take stopChan as an argument to NewCloudCIDRAllocator and pass it to the worker.
go ca.worker(wait.NeverStop)
}
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
return
return ca, nil
}
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-ca.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
ca.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}
func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
if ca.nodesInProcessing.Has(nodeName) {
return false
}
ca.nodesInProcessing.Insert(nodeName)
return true
}
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.nodesInProcessing.Delete(nodeName)
}
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
glog.V(2).Infof("Updating PodCIDR for node %v", node.Name)
if node == nil {
return nil
}
if !ca.insertNodeToProcessing(node.Name) {
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil
}
cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name))
if err != nil {
ca.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
if len(cidrs) == 0 {
ca.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
glog.V(2).Infof("Node %v has no CIDRs", node.Name)
return fmt.Errorf("failed to allocate cidr (none exist)")
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
}
node, err = ca.client.Core().Nodes().Get(node.Name, metav1.GetOptions{})
_, cidr, err := net.ParseCIDR(cidrs[0])
if err != nil {
glog.Errorf("Could not get Node object from Kubernetes: %v", err)
return err
return fmt.Errorf("failed to parse string '%s' as a CIDR: %v", cidrs[0], err)
}
podCIDR := cidrs[0]
glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, cidrs[0])
ca.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name,
cidr: cidr,
}
return nil
}
if node.Spec.PodCIDR != "" {
if node.Spec.PodCIDR == podCIDR {
glog.V(3).Infof("Node %v has PodCIDR %v", node.Name, podCIDR)
return nil
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *v1.Node
defer ca.removeNodeFromProcessing(data.nodeName)
podCIDR := data.cidr.String()
for rep := 0; rep < cidrUpdateRetries; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = ca.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{})
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
continue
}
glog.Errorf("PodCIDR cannot be reassigned, node %v spec has %v, but cloud provider has assigned %v",
node.Name, node.Spec.PodCIDR, podCIDR)
// We fall through and set the CIDR despite this error. This
// implements the same logic as implemented in the
// rangeAllocator.
//
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
if node.Spec.PodCIDR != "" {
if node.Spec.PodCIDR == podCIDR {
glog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR)
return nil
}
glog.Errorf("PodCIDR being reassigned! Node %v spec has %v, but cloud provider has assigned %v",
node.Name, node.Spec.PodCIDR, podCIDR)
// We fall through and set the CIDR despite this error. This
// implements the same logic as implemented in the
// rangeAllocator.
//
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
}
node.Spec.PodCIDR = podCIDR
if _, err = ca.client.Core().Nodes().Update(node); err == nil {
glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
break
}
glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err)
}
node.Spec.PodCIDR = cidrs[0]
if _, err := ca.client.Core().Nodes().Update(node); err == nil {
glog.V(2).Infof("Node %v PodCIDR set to %v", node.Name, podCIDR)
} else {
glog.Errorf("Could not update node %v PodCIDR to %v: %v",
node.Name, podCIDR, err)
if err != nil {
util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v.", data.nodeName, err)
return err
}
@ -131,10 +209,8 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
LastTransitionTime: metav1.Now(),
})
if err != nil {
glog.Errorf("Error setting route status for node %v: %v",
node.Name, err)
glog.Errorf("Error setting route status for node %v: %v", node.Name, err)
}
return err
}

View File

@ -39,26 +39,19 @@ import (
"k8s.io/kubernetes/pkg/controller/node/util"
)
// TODO: figure out the good setting for those constants.
const (
// controls how many NodeSpec updates NC can process concurrently.
cidrUpdateWorkers = 10
cidrUpdateQueueSize = 5000
// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
podCIDRUpdateRetry = 5
)
type rangeAllocator struct {
client clientset.Interface
cidrs *cidrset.CidrSet
clusterCIDR *net.IPNet
maxCIDRs int
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
sync.Mutex
lock sync.Mutex
nodesInProcessing sets.String
}
@ -67,15 +60,15 @@ type rangeAllocator struct {
// Caller must always pass in a list of existing nodes so the new allocator
// can initialize its CIDR map. NodeList is only nil in testing.
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
if client == nil {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof)
if client != nil {
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
} else {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
ra := &rangeAllocator{
client: client,
@ -111,28 +104,31 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
}
}
for i := 0; i < cidrUpdateWorkers; i++ {
go func(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-ra.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("NodeCIDRUpdateChannel read returned false.")
return
}
ra.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}(wait.NeverStop)
// TODO: Take stopChan as an argument to NewCIDRRangeAllocator and pass it to the worker.
go ra.worker(wait.NeverStop)
}
return ra, nil
}
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-r.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
r.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
r.Lock()
defer r.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.nodesInProcessing.Has(nodeName) {
return false
}
@ -141,8 +137,8 @@ func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
}
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
r.Lock()
defer r.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
r.nodesInProcessing.Delete(nodeName)
}
@ -162,7 +158,8 @@ func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
}
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to handle correctly nodesInProcessing.
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
if node == nil {
return nil
@ -181,7 +178,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
return fmt.Errorf("failed to allocate cidr: %v", err)
}
glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
r.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name,
cidr: podCIDR,
@ -222,12 +219,14 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
}
}
// Assigns CIDR to Node and sends an update to the API server.
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *v1.Node
defer r.removeNodeFromProcessing(data.nodeName)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
podCIDR := data.cidr.String()
for rep := 0; rep < cidrUpdateRetries; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = r.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{})
if err != nil {
@ -236,21 +235,21 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
if node.Spec.PodCIDR != "" {
glog.V(4).Infof("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
if node.Spec.PodCIDR != data.cidr.String() {
if node.Spec.PodCIDR != podCIDR {
glog.Errorf("Node %q PodCIDR seems to have changed (original=%v, current=%v), releasing original and occupying new CIDR",
node.Name, node.Spec.PodCIDR, data.cidr.String())
node.Name, node.Spec.PodCIDR, podCIDR)
if err := r.cidrs.Release(data.cidr); err != nil {
glog.Errorf("Error when releasing CIDR %v", data.cidr.String())
glog.Errorf("Error when releasing CIDR %v", podCIDR)
}
}
return nil
}
node.Spec.PodCIDR = data.cidr.String()
if _, err := r.client.Core().Nodes().Update(node); err != nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
} else {
node.Spec.PodCIDR = podCIDR
if _, err = r.client.Core().Nodes().Update(node); err == nil {
glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
break
}
glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err)
}
if err != nil {
util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")