diff --git a/pkg/controller/node/ipam/BUILD b/pkg/controller/node/ipam/BUILD index 3f8e3d110c9..f2319adf8ad 100644 --- a/pkg/controller/node/ipam/BUILD +++ b/pkg/controller/node/ipam/BUILD @@ -37,7 +37,6 @@ go_library( "timeout.go", ], deps = [ - "//pkg/api:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller/node/ipam/cidrset:go_default_library", diff --git a/pkg/controller/node/ipam/cidr_allocator.go b/pkg/controller/node/ipam/cidr_allocator.go index 9acfeadb031..6fa8a684e06 100644 --- a/pkg/controller/node/ipam/cidr_allocator.go +++ b/pkg/controller/node/ipam/cidr_allocator.go @@ -62,7 +62,7 @@ const ( apiserverStartupGracePeriod = 10 * time.Minute // The no. of NodeSpec updates NC can process concurrently. - cidrUpdateWorkers = 10 + cidrUpdateWorkers = 30 // The max no. of NodeSpec updates that can be enqueued. cidrUpdateQueueSize = 5000 diff --git a/pkg/controller/node/ipam/cloud_cidr_allocator.go b/pkg/controller/node/ipam/cloud_cidr_allocator.go index da0d5e6fb4b..5403ebb91a1 100644 --- a/pkg/controller/node/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/node/ipam/cloud_cidr_allocator.go @@ -18,19 +18,23 @@ package ipam import ( "fmt" + "net" "sync" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/api" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/controller/node/util" @@ -42,84 +46,158 @@ import ( // deallocation is delegated to the external provider, and the controller // merely takes the assignment and updates the node spec. type cloudCIDRAllocator struct { - lock sync.Mutex - client clientset.Interface cloud *gce.GCECloud - recorder record.EventRecorder + // Channel that is used to pass updating Nodes with assigned CIDRs to the background + // This increases a throughput of CIDR assignment by not blocking on long operations. + nodeCIDRUpdateChannel chan nodeAndCIDR + recorder record.EventRecorder + + // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation + lock sync.Mutex + nodesInProcessing sets.String } var _ CIDRAllocator = (*cloudCIDRAllocator)(nil) // NewCloudCIDRAllocator creates a new cloud CIDR allocator. -func NewCloudCIDRAllocator( - client clientset.Interface, - cloud cloudprovider.Interface) (ca CIDRAllocator, err error) { +func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface) (CIDRAllocator, error) { + if client == nil { + glog.Fatalf("kubeClient is nil when starting NodeController") + } + + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) + eventBroadcaster.StartLogging(glog.Infof) + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")}) gceCloud, ok := cloud.(*gce.GCECloud) if !ok { - err = fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName()) - return + err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName()) + return nil, err } - ca = &cloudCIDRAllocator{ + ca := &cloudCIDRAllocator{ client: client, cloud: gceCloud, - recorder: record.NewBroadcaster().NewRecorder( - api.Scheme, - v1.EventSource{Component: "cidrAllocator"}), + nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), + recorder: recorder, + nodesInProcessing: sets.NewString(), + } + + for i := 0; i < cidrUpdateWorkers; i++ { + // TODO: Take stopChan as an argument to NewCloudCIDRAllocator and pass it to the worker. + go ca.worker(wait.NeverStop) } glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName()) - - return + return ca, nil } +func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) { + for { + select { + case workItem, ok := <-ca.nodeCIDRUpdateChannel: + if !ok { + glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed") + return + } + ca.updateCIDRAllocation(workItem) + case <-stopChan: + return + } + } +} + +func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool { + ca.lock.Lock() + defer ca.lock.Unlock() + if ca.nodesInProcessing.Has(nodeName) { + return false + } + ca.nodesInProcessing.Insert(nodeName) + return true +} + +func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) { + ca.lock.Lock() + defer ca.lock.Unlock() + ca.nodesInProcessing.Delete(nodeName) +} + +// WARNING: If you're adding any return calls or defer any more work from this +// function you have to make sure to update nodesInProcessing properly with the +// disposition of the node when the work is done. func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { - glog.V(2).Infof("Updating PodCIDR for node %v", node.Name) - + if node == nil { + return nil + } + if !ca.insertNodeToProcessing(node.Name) { + glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name) + return nil + } cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name)) - if err != nil { + ca.removeNodeFromProcessing(node.Name) util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } - if len(cidrs) == 0 { + ca.removeNodeFromProcessing(node.Name) util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") - glog.V(2).Infof("Node %v has no CIDRs", node.Name) - return fmt.Errorf("failed to allocate cidr (none exist)") + return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name) } - - node, err = ca.client.Core().Nodes().Get(node.Name, metav1.GetOptions{}) + _, cidr, err := net.ParseCIDR(cidrs[0]) if err != nil { - glog.Errorf("Could not get Node object from Kubernetes: %v", err) - return err + return fmt.Errorf("failed to parse string '%s' as a CIDR: %v", cidrs[0], err) } - podCIDR := cidrs[0] + glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, cidrs[0]) + ca.nodeCIDRUpdateChannel <- nodeAndCIDR{ + nodeName: node.Name, + cidr: cidr, + } + return nil +} - if node.Spec.PodCIDR != "" { - if node.Spec.PodCIDR == podCIDR { - glog.V(3).Infof("Node %v has PodCIDR %v", node.Name, podCIDR) - return nil +// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server. +func (ca *cloudCIDRAllocator) updateCIDRAllocation(data nodeAndCIDR) error { + var err error + var node *v1.Node + defer ca.removeNodeFromProcessing(data.nodeName) + podCIDR := data.cidr.String() + for rep := 0; rep < cidrUpdateRetries; rep++ { + // TODO: change it to using PATCH instead of full Node updates. + node, err = ca.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) + continue } - glog.Errorf("PodCIDR cannot be reassigned, node %v spec has %v, but cloud provider has assigned %v", - node.Name, node.Spec.PodCIDR, podCIDR) - // We fall through and set the CIDR despite this error. This - // implements the same logic as implemented in the - // rangeAllocator. - // - // See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248 + if node.Spec.PodCIDR != "" { + if node.Spec.PodCIDR == podCIDR { + glog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR) + return nil + } + glog.Errorf("PodCIDR being reassigned! Node %v spec has %v, but cloud provider has assigned %v", + node.Name, node.Spec.PodCIDR, podCIDR) + // We fall through and set the CIDR despite this error. This + // implements the same logic as implemented in the + // rangeAllocator. + // + // See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248 + } + node.Spec.PodCIDR = podCIDR + if _, err = ca.client.Core().Nodes().Update(node); err == nil { + glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR) + break + } + glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err) } - - node.Spec.PodCIDR = cidrs[0] - if _, err := ca.client.Core().Nodes().Update(node); err == nil { - glog.V(2).Infof("Node %v PodCIDR set to %v", node.Name, podCIDR) - } else { - glog.Errorf("Could not update node %v PodCIDR to %v: %v", - node.Name, podCIDR, err) + if err != nil { + util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") + glog.Errorf("CIDR assignment for node %v failed: %v.", data.nodeName, err) return err } @@ -131,10 +209,8 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { LastTransitionTime: metav1.Now(), }) if err != nil { - glog.Errorf("Error setting route status for node %v: %v", - node.Name, err) + glog.Errorf("Error setting route status for node %v: %v", node.Name, err) } - return err } diff --git a/pkg/controller/node/ipam/range_allocator.go b/pkg/controller/node/ipam/range_allocator.go index 338d2020e0a..cacc47a8617 100644 --- a/pkg/controller/node/ipam/range_allocator.go +++ b/pkg/controller/node/ipam/range_allocator.go @@ -44,12 +44,14 @@ type rangeAllocator struct { cidrs *cidrset.CidrSet clusterCIDR *net.IPNet maxCIDRs int + // Channel that is used to pass updating Nodes with assigned CIDRs to the background // This increases a throughput of CIDR assignment by not blocking on long operations. nodeCIDRUpdateChannel chan nodeAndCIDR recorder record.EventRecorder + // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation - sync.Mutex + lock sync.Mutex nodesInProcessing sets.String } @@ -58,15 +60,15 @@ type rangeAllocator struct { // Caller must always pass in a list of existing nodes so the new allocator // can initialize its CIDR map. NodeList is only nil in testing. func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) { + if client == nil { + glog.Fatalf("kubeClient is nil when starting NodeController") + } + eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) eventBroadcaster.StartLogging(glog.Infof) - if client != nil { - glog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")}) - } else { - glog.Fatalf("kubeClient is nil when starting NodeController") - } + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")}) ra := &rangeAllocator{ client: client, @@ -102,28 +104,31 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s } } for i := 0; i < cidrUpdateWorkers; i++ { - go func(stopChan <-chan struct{}) { - for { - select { - case workItem, ok := <-ra.nodeCIDRUpdateChannel: - if !ok { - glog.Warning("NodeCIDRUpdateChannel read returned false.") - return - } - ra.updateCIDRAllocation(workItem) - case <-stopChan: - return - } - } - }(wait.NeverStop) + // TODO: Take stopChan as an argument to NewCIDRRangeAllocator and pass it to the worker. + go ra.worker(wait.NeverStop) } return ra, nil } +func (r *rangeAllocator) worker(stopChan <-chan struct{}) { + for { + select { + case workItem, ok := <-r.nodeCIDRUpdateChannel: + if !ok { + glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed") + return + } + r.updateCIDRAllocation(workItem) + case <-stopChan: + return + } + } +} + func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { - r.Lock() - defer r.Unlock() + r.lock.Lock() + defer r.lock.Unlock() if r.nodesInProcessing.Has(nodeName) { return false } @@ -132,8 +137,8 @@ func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { } func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { - r.Lock() - defer r.Unlock() + r.lock.Lock() + defer r.lock.Unlock() r.nodesInProcessing.Delete(nodeName) } @@ -153,7 +158,8 @@ func (r *rangeAllocator) occupyCIDR(node *v1.Node) error { } // WARNING: If you're adding any return calls or defer any more work from this -// function you have to handle correctly nodesInProcessing. +// function you have to make sure to update nodesInProcessing properly with the +// disposition of the node when the work is done. func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { if node == nil { return nil @@ -172,7 +178,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { return fmt.Errorf("failed to allocate cidr: %v", err) } - glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) + glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) r.nodeCIDRUpdateChannel <- nodeAndCIDR{ nodeName: node.Name, cidr: podCIDR, @@ -213,11 +219,13 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { } } -// Assigns CIDR to Node and sends an update to the API server. +// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server. func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { var err error var node *v1.Node defer r.removeNodeFromProcessing(data.nodeName) + + podCIDR := data.cidr.String() for rep := 0; rep < cidrUpdateRetries; rep++ { // TODO: change it to using PATCH instead of full Node updates. node, err = r.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{}) @@ -227,21 +235,21 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } if node.Spec.PodCIDR != "" { glog.V(4).Infof("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR) - if node.Spec.PodCIDR != data.cidr.String() { + if node.Spec.PodCIDR != podCIDR { glog.Errorf("Node %q PodCIDR seems to have changed (original=%v, current=%v), releasing original and occupying new CIDR", - node.Name, node.Spec.PodCIDR, data.cidr.String()) + node.Name, node.Spec.PodCIDR, podCIDR) if err := r.cidrs.Release(data.cidr); err != nil { - glog.Errorf("Error when releasing CIDR %v", data.cidr.String()) + glog.Errorf("Error when releasing CIDR %v", podCIDR) } } return nil } - node.Spec.PodCIDR = data.cidr.String() - if _, err := r.client.Core().Nodes().Update(node); err != nil { - glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", cidrUpdateRetries-rep-1, err) - } else { + node.Spec.PodCIDR = podCIDR + if _, err = r.client.Core().Nodes().Update(node); err == nil { + glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR) break } + glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err) } if err != nil { util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")