Add concurrency to cloud CIDR allocator & make it non-blocking on NodeSpec updates

This commit is contained in:
Shyam Jeedigunta 2017-09-11 18:39:55 +02:00
parent 9be91e42c7
commit 5d864aa3c2
4 changed files with 167 additions and 84 deletions

View File

@ -37,7 +37,6 @@ go_library(
"timeout.go", "timeout.go",
], ],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library", "//pkg/controller/node/ipam/cidrset:go_default_library",

View File

@ -62,7 +62,7 @@ const (
apiserverStartupGracePeriod = 10 * time.Minute apiserverStartupGracePeriod = 10 * time.Minute
// The no. of NodeSpec updates NC can process concurrently. // The no. of NodeSpec updates NC can process concurrently.
cidrUpdateWorkers = 10 cidrUpdateWorkers = 30
// The max no. of NodeSpec updates that can be enqueued. // The max no. of NodeSpec updates that can be enqueued.
cidrUpdateQueueSize = 5000 cidrUpdateQueueSize = 5000

View File

@ -18,19 +18,23 @@ package ipam
import ( import (
"fmt" "fmt"
"net"
"sync" "sync"
"github.com/golang/glog" "github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1" informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller/node/util" "k8s.io/kubernetes/pkg/controller/node/util"
@ -42,84 +46,158 @@ import (
// deallocation is delegated to the external provider, and the controller // deallocation is delegated to the external provider, and the controller
// merely takes the assignment and updates the node spec. // merely takes the assignment and updates the node spec.
type cloudCIDRAllocator struct { type cloudCIDRAllocator struct {
lock sync.Mutex
client clientset.Interface client clientset.Interface
cloud *gce.GCECloud cloud *gce.GCECloud
recorder record.EventRecorder // Channel that is used to pass updating Nodes with assigned CIDRs to the background
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
lock sync.Mutex
nodesInProcessing sets.String
} }
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil) var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator. // NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator( func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface) (CIDRAllocator, error) {
client clientset.Interface, if client == nil {
cloud cloudprovider.Interface) (ca CIDRAllocator, err error) { glog.Fatalf("kubeClient is nil when starting NodeController")
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof)
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
gceCloud, ok := cloud.(*gce.GCECloud) gceCloud, ok := cloud.(*gce.GCECloud)
if !ok { if !ok {
err = fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName()) err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
return return nil, err
} }
ca = &cloudCIDRAllocator{ ca := &cloudCIDRAllocator{
client: client, client: client,
cloud: gceCloud, cloud: gceCloud,
recorder: record.NewBroadcaster().NewRecorder( nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
api.Scheme, recorder: recorder,
v1.EventSource{Component: "cidrAllocator"}), nodesInProcessing: sets.NewString(),
}
for i := 0; i < cidrUpdateWorkers; i++ {
// TODO: Take stopChan as an argument to NewCloudCIDRAllocator and pass it to the worker.
go ca.worker(wait.NeverStop)
} }
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName()) glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
return ca, nil
return
} }
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-ca.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
ca.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}
func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
if ca.nodesInProcessing.Has(nodeName) {
return false
}
ca.nodesInProcessing.Insert(nodeName)
return true
}
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.nodesInProcessing.Delete(nodeName)
}
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
glog.V(2).Infof("Updating PodCIDR for node %v", node.Name) if node == nil {
return nil
}
if !ca.insertNodeToProcessing(node.Name) {
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil
}
cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name)) cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name))
if err != nil { if err != nil {
ca.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err) return fmt.Errorf("failed to allocate cidr: %v", err)
} }
if len(cidrs) == 0 { if len(cidrs) == 0 {
ca.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
glog.V(2).Infof("Node %v has no CIDRs", node.Name) return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
return fmt.Errorf("failed to allocate cidr (none exist)")
} }
_, cidr, err := net.ParseCIDR(cidrs[0])
node, err = ca.client.Core().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil { if err != nil {
glog.Errorf("Could not get Node object from Kubernetes: %v", err) return fmt.Errorf("failed to parse string '%s' as a CIDR: %v", cidrs[0], err)
return err
} }
podCIDR := cidrs[0] glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, cidrs[0])
ca.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name,
cidr: cidr,
}
return nil
}
if node.Spec.PodCIDR != "" { // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
if node.Spec.PodCIDR == podCIDR { func (ca *cloudCIDRAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
glog.V(3).Infof("Node %v has PodCIDR %v", node.Name, podCIDR) var err error
return nil var node *v1.Node
defer ca.removeNodeFromProcessing(data.nodeName)
podCIDR := data.cidr.String()
for rep := 0; rep < cidrUpdateRetries; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = ca.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{})
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
continue
} }
glog.Errorf("PodCIDR cannot be reassigned, node %v spec has %v, but cloud provider has assigned %v", if node.Spec.PodCIDR != "" {
node.Name, node.Spec.PodCIDR, podCIDR) if node.Spec.PodCIDR == podCIDR {
// We fall through and set the CIDR despite this error. This glog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR)
// implements the same logic as implemented in the return nil
// rangeAllocator. }
// glog.Errorf("PodCIDR being reassigned! Node %v spec has %v, but cloud provider has assigned %v",
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248 node.Name, node.Spec.PodCIDR, podCIDR)
// We fall through and set the CIDR despite this error. This
// implements the same logic as implemented in the
// rangeAllocator.
//
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
}
node.Spec.PodCIDR = podCIDR
if _, err = ca.client.Core().Nodes().Update(node); err == nil {
glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
break
}
glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err)
} }
if err != nil {
node.Spec.PodCIDR = cidrs[0] util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
if _, err := ca.client.Core().Nodes().Update(node); err == nil { glog.Errorf("CIDR assignment for node %v failed: %v.", data.nodeName, err)
glog.V(2).Infof("Node %v PodCIDR set to %v", node.Name, podCIDR)
} else {
glog.Errorf("Could not update node %v PodCIDR to %v: %v",
node.Name, podCIDR, err)
return err return err
} }
@ -131,10 +209,8 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
LastTransitionTime: metav1.Now(), LastTransitionTime: metav1.Now(),
}) })
if err != nil { if err != nil {
glog.Errorf("Error setting route status for node %v: %v", glog.Errorf("Error setting route status for node %v: %v", node.Name, err)
node.Name, err)
} }
return err return err
} }

View File

@ -44,12 +44,14 @@ type rangeAllocator struct {
cidrs *cidrset.CidrSet cidrs *cidrset.CidrSet
clusterCIDR *net.IPNet clusterCIDR *net.IPNet
maxCIDRs int maxCIDRs int
// Channel that is used to pass updating Nodes with assigned CIDRs to the background // Channel that is used to pass updating Nodes with assigned CIDRs to the background
// This increases a throughput of CIDR assignment by not blocking on long operations. // This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
sync.Mutex lock sync.Mutex
nodesInProcessing sets.String nodesInProcessing sets.String
} }
@ -58,15 +60,15 @@ type rangeAllocator struct {
// Caller must always pass in a list of existing nodes so the new allocator // Caller must always pass in a list of existing nodes so the new allocator
// can initialize its CIDR map. NodeList is only nil in testing. // can initialize its CIDR map. NodeList is only nil in testing.
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) { func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
if client == nil {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
if client != nil { glog.V(0).Infof("Sending events to api server.")
glog.V(0).Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
} else {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
ra := &rangeAllocator{ ra := &rangeAllocator{
client: client, client: client,
@ -102,28 +104,31 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
} }
} }
for i := 0; i < cidrUpdateWorkers; i++ { for i := 0; i < cidrUpdateWorkers; i++ {
go func(stopChan <-chan struct{}) { // TODO: Take stopChan as an argument to NewCIDRRangeAllocator and pass it to the worker.
for { go ra.worker(wait.NeverStop)
select {
case workItem, ok := <-ra.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("NodeCIDRUpdateChannel read returned false.")
return
}
ra.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}(wait.NeverStop)
} }
return ra, nil return ra, nil
} }
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-r.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
r.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
r.Lock() r.lock.Lock()
defer r.Unlock() defer r.lock.Unlock()
if r.nodesInProcessing.Has(nodeName) { if r.nodesInProcessing.Has(nodeName) {
return false return false
} }
@ -132,8 +137,8 @@ func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
} }
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
r.Lock() r.lock.Lock()
defer r.Unlock() defer r.lock.Unlock()
r.nodesInProcessing.Delete(nodeName) r.nodesInProcessing.Delete(nodeName)
} }
@ -153,7 +158,8 @@ func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
} }
// WARNING: If you're adding any return calls or defer any more work from this // WARNING: If you're adding any return calls or defer any more work from this
// function you have to handle correctly nodesInProcessing. // function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
if node == nil { if node == nil {
return nil return nil
@ -172,7 +178,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
return fmt.Errorf("failed to allocate cidr: %v", err) return fmt.Errorf("failed to allocate cidr: %v", err)
} }
glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
r.nodeCIDRUpdateChannel <- nodeAndCIDR{ r.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name, nodeName: node.Name,
cidr: podCIDR, cidr: podCIDR,
@ -213,11 +219,13 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
} }
} }
// Assigns CIDR to Node and sends an update to the API server. // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error var err error
var node *v1.Node var node *v1.Node
defer r.removeNodeFromProcessing(data.nodeName) defer r.removeNodeFromProcessing(data.nodeName)
podCIDR := data.cidr.String()
for rep := 0; rep < cidrUpdateRetries; rep++ { for rep := 0; rep < cidrUpdateRetries; rep++ {
// TODO: change it to using PATCH instead of full Node updates. // TODO: change it to using PATCH instead of full Node updates.
node, err = r.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{}) node, err = r.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{})
@ -227,21 +235,21 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
} }
if node.Spec.PodCIDR != "" { if node.Spec.PodCIDR != "" {
glog.V(4).Infof("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR) glog.V(4).Infof("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
if node.Spec.PodCIDR != data.cidr.String() { if node.Spec.PodCIDR != podCIDR {
glog.Errorf("Node %q PodCIDR seems to have changed (original=%v, current=%v), releasing original and occupying new CIDR", glog.Errorf("Node %q PodCIDR seems to have changed (original=%v, current=%v), releasing original and occupying new CIDR",
node.Name, node.Spec.PodCIDR, data.cidr.String()) node.Name, node.Spec.PodCIDR, podCIDR)
if err := r.cidrs.Release(data.cidr); err != nil { if err := r.cidrs.Release(data.cidr); err != nil {
glog.Errorf("Error when releasing CIDR %v", data.cidr.String()) glog.Errorf("Error when releasing CIDR %v", podCIDR)
} }
} }
return nil return nil
} }
node.Spec.PodCIDR = data.cidr.String() node.Spec.PodCIDR = podCIDR
if _, err := r.client.Core().Nodes().Update(node); err != nil { if _, err = r.client.Core().Nodes().Update(node); err == nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", cidrUpdateRetries-rep-1, err) glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
} else {
break break
} }
glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err)
} }
if err != nil { if err != nil {
util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")