mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-06 07:57:35 +00:00
Retry assigning CIDRs
This commit is contained in:
@@ -20,10 +20,13 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
"github.com/golang/glog"
|
||||
@@ -60,6 +63,9 @@ type rangeAllocator struct {
|
||||
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
||||
nodeCIDRUpdateChannel chan nodeAndCIDR
|
||||
recorder record.EventRecorder
|
||||
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
|
||||
sync.Mutex
|
||||
nodesInProcessing sets.String
|
||||
}
|
||||
|
||||
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
|
||||
@@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
||||
clusterCIDR: clusterCIDR,
|
||||
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
||||
recorder: recorder,
|
||||
nodesInProcessing: sets.NewString(),
|
||||
}
|
||||
|
||||
if serviceCIDR != nil {
|
||||
@@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
||||
return ra, nil
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
if r.nodesInProcessing.Has(nodeName) {
|
||||
return false
|
||||
}
|
||||
r.nodesInProcessing.Insert(nodeName)
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.nodesInProcessing.Delete(nodeName)
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
|
||||
defer r.removeNodeFromProcessing(node.Name)
|
||||
if node.Spec.PodCIDR == "" {
|
||||
return nil
|
||||
}
|
||||
@@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
|
||||
|
||||
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
|
||||
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
|
||||
// WARNING: If you're adding any return calls or defer any more work from this function
|
||||
// you have to handle correctly nodesInProcessing.
|
||||
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
|
||||
if node == nil {
|
||||
return nil
|
||||
}
|
||||
if !r.insertNodeToProcessing(node.Name) {
|
||||
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
|
||||
return nil
|
||||
}
|
||||
if node.Spec.PodCIDR != "" {
|
||||
return r.occupyCIDR(node)
|
||||
}
|
||||
podCIDR, err := r.cidrs.allocateNext()
|
||||
if err != nil {
|
||||
r.removeNodeFromProcessing(node.Name)
|
||||
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("failed to allocate cidr: %v", err)
|
||||
}
|
||||
@@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be
|
||||
// assignable.
|
||||
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
|
||||
// so that they won't be assignable.
|
||||
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
||||
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
|
||||
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
|
||||
@@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
||||
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
||||
var err error
|
||||
var node *api.Node
|
||||
defer r.removeNodeFromProcessing(data.nodeName)
|
||||
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
|
||||
// TODO: change it to using PATCH instead of full Node updates.
|
||||
node, err = r.client.Core().Nodes().Get(data.nodeName)
|
||||
@@ -217,9 +252,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
||||
}
|
||||
if err != nil {
|
||||
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
||||
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
|
||||
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
|
||||
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
|
||||
// We accept the fact that we may leek CIDRs here. This is safer than releasing
|
||||
// them in case when we don't know if request went through.
|
||||
// NodeController restart will return all falsely allocated CIDRs to the pool.
|
||||
if !apierrors.IsServerTimeout(err) {
|
||||
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
|
||||
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
|
||||
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user