mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	If CIDRAllocatorType is set to `CloudCIDRAllocator`, then allocation of CIDR allocation instead is done by the external cloud provider and the node controller is only responsible for reflecting the allocation into the node spec. - Splits off the rangeAllocator from the cidr_allocator.go file. - Adds cloudCIDRAllocator, which is used when the cloud provider allocates the CIDR ranges externally. (GCE support only) - Updates RBAC permission for node controller to include PATCH
		
			
				
	
	
		
			263 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			263 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package node
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"net"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | 
						|
	clientv1 "k8s.io/client-go/pkg/api/v1"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	"k8s.io/kubernetes/pkg/api"
 | 
						|
	"k8s.io/kubernetes/pkg/api/v1"
 | 
						|
	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
)
 | 
						|
 | 
						|
// TODO: figure out the good setting for those constants.
 | 
						|
const (
 | 
						|
	// controls how many NodeSpec updates NC can process concurrently.
 | 
						|
	cidrUpdateWorkers   = 10
 | 
						|
	cidrUpdateQueueSize = 5000
 | 
						|
	// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
 | 
						|
	podCIDRUpdateRetry = 5
 | 
						|
)
 | 
						|
 | 
						|
type rangeAllocator struct {
 | 
						|
	client      clientset.Interface
 | 
						|
	cidrs       *cidrSet
 | 
						|
	clusterCIDR *net.IPNet
 | 
						|
	maxCIDRs    int
 | 
						|
	// Channel that is used to pass updating Nodes with assigned CIDRs to the background
 | 
						|
	// This increases a throughput of CIDR assignment by not blocking on long operations.
 | 
						|
	nodeCIDRUpdateChannel chan nodeAndCIDR
 | 
						|
	recorder              record.EventRecorder
 | 
						|
	// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
 | 
						|
	sync.Mutex
 | 
						|
	nodesInProcessing sets.String
 | 
						|
}
 | 
						|
 | 
						|
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
 | 
						|
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
 | 
						|
// Caller must always pass in a list of existing nodes so the new allocator
 | 
						|
// can initialize its CIDR map. NodeList is only nil in testing.
 | 
						|
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
 | 
						|
	eventBroadcaster := record.NewBroadcaster()
 | 
						|
	recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cidrAllocator"})
 | 
						|
	eventBroadcaster.StartLogging(glog.Infof)
 | 
						|
	if client != nil {
 | 
						|
		glog.V(0).Infof("Sending events to api server.")
 | 
						|
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
 | 
						|
	} else {
 | 
						|
		glog.Fatalf("kubeClient is nil when starting NodeController")
 | 
						|
	}
 | 
						|
 | 
						|
	ra := &rangeAllocator{
 | 
						|
		client:                client,
 | 
						|
		cidrs:                 newCIDRSet(clusterCIDR, subNetMaskSize),
 | 
						|
		clusterCIDR:           clusterCIDR,
 | 
						|
		nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
 | 
						|
		recorder:              recorder,
 | 
						|
		nodesInProcessing:     sets.NewString(),
 | 
						|
	}
 | 
						|
 | 
						|
	if serviceCIDR != nil {
 | 
						|
		ra.filterOutServiceRange(serviceCIDR)
 | 
						|
	} else {
 | 
						|
		glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
 | 
						|
	}
 | 
						|
 | 
						|
	if nodeList != nil {
 | 
						|
		for _, node := range nodeList.Items {
 | 
						|
			if node.Spec.PodCIDR == "" {
 | 
						|
				glog.Infof("Node %v has no CIDR, ignoring", node.Name)
 | 
						|
				continue
 | 
						|
			} else {
 | 
						|
				glog.Infof("Node %v has CIDR %s, occupying it in CIDR map",
 | 
						|
					node.Name, node.Spec.PodCIDR)
 | 
						|
			}
 | 
						|
			if err := ra.occupyCIDR(&node); err != nil {
 | 
						|
				// This will happen if:
 | 
						|
				// 1. We find garbage in the podCIDR field. Retrying is useless.
 | 
						|
				// 2. CIDR out of range: This means a node CIDR has changed.
 | 
						|
				// This error will keep crashing controller-manager.
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for i := 0; i < cidrUpdateWorkers; i++ {
 | 
						|
		go func(stopChan <-chan struct{}) {
 | 
						|
			for {
 | 
						|
				select {
 | 
						|
				case workItem, ok := <-ra.nodeCIDRUpdateChannel:
 | 
						|
					if !ok {
 | 
						|
						glog.Warning("NodeCIDRUpdateChannel read returned false.")
 | 
						|
						return
 | 
						|
					}
 | 
						|
					ra.updateCIDRAllocation(workItem)
 | 
						|
				case <-stopChan:
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}(wait.NeverStop)
 | 
						|
	}
 | 
						|
 | 
						|
	return ra, nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
 | 
						|
	r.Lock()
 | 
						|
	defer r.Unlock()
 | 
						|
	if r.nodesInProcessing.Has(nodeName) {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	r.nodesInProcessing.Insert(nodeName)
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
 | 
						|
	r.Lock()
 | 
						|
	defer r.Unlock()
 | 
						|
	r.nodesInProcessing.Delete(nodeName)
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
 | 
						|
	defer r.removeNodeFromProcessing(node.Name)
 | 
						|
	if node.Spec.PodCIDR == "" {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
 | 
						|
	}
 | 
						|
	if err := r.cidrs.occupy(podCIDR); err != nil {
 | 
						|
		return fmt.Errorf("failed to mark cidr as occupied: %v", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// WARNING: If you're adding any return calls or defer any more work from this
 | 
						|
// function you have to handle correctly nodesInProcessing.
 | 
						|
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
 | 
						|
	if node == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if !r.insertNodeToProcessing(node.Name) {
 | 
						|
		glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if node.Spec.PodCIDR != "" {
 | 
						|
		return r.occupyCIDR(node)
 | 
						|
	}
 | 
						|
	podCIDR, err := r.cidrs.allocateNext()
 | 
						|
	if err != nil {
 | 
						|
		r.removeNodeFromProcessing(node.Name)
 | 
						|
		recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
 | 
						|
		return fmt.Errorf("failed to allocate cidr: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
 | 
						|
	r.nodeCIDRUpdateChannel <- nodeAndCIDR{
 | 
						|
		nodeName: node.Name,
 | 
						|
		cidr:     podCIDR,
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
 | 
						|
	if node == nil || node.Spec.PodCIDR == "" {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err)
 | 
						|
	}
 | 
						|
 | 
						|
	glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
 | 
						|
	if err = r.cidrs.release(podCIDR); err != nil {
 | 
						|
		return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
 | 
						|
// so that they won't be assignable.
 | 
						|
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
 | 
						|
	// Checks if service CIDR has a nonempty intersection with cluster
 | 
						|
	// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
 | 
						|
	// clusterCIDR's Mask applied (this means that clusterCIDR contains
 | 
						|
	// serviceCIDR) or vice versa (which means that serviceCIDR contains
 | 
						|
	// clusterCIDR).
 | 
						|
	if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if err := r.cidrs.occupy(serviceCIDR); err != nil {
 | 
						|
		glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Assigns CIDR to Node and sends an update to the API server.
 | 
						|
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
 | 
						|
	var err error
 | 
						|
	var node *v1.Node
 | 
						|
	defer r.removeNodeFromProcessing(data.nodeName)
 | 
						|
	for rep := 0; rep < podCIDRUpdateRetry; rep++ {
 | 
						|
		// TODO: change it to using PATCH instead of full Node updates.
 | 
						|
		node, err = r.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{})
 | 
						|
		if err != nil {
 | 
						|
			glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if node.Spec.PodCIDR != "" {
 | 
						|
			glog.Errorf("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
 | 
						|
			if node.Spec.PodCIDR != data.cidr.String() {
 | 
						|
				if err := r.cidrs.release(data.cidr); err != nil {
 | 
						|
					glog.Errorf("Error when releasing CIDR %v", data.cidr.String())
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		node.Spec.PodCIDR = data.cidr.String()
 | 
						|
		if _, err := r.client.Core().Nodes().Update(node); err != nil {
 | 
						|
			glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
 | 
						|
		// We accept the fact that we may leek CIDRs here. This is safer than releasing
 | 
						|
		// them in case when we don't know if request went through.
 | 
						|
		// NodeController restart will return all falsely allocated CIDRs to the pool.
 | 
						|
		if !apierrors.IsServerTimeout(err) {
 | 
						|
			glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
 | 
						|
			if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
 | 
						|
				glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 |