mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #29101 from gmarek/allocator2
Automatic merge from submit-queue Retry assigning CIDRs Fix #28879, ref #29064 cc @bgrant0607 @bprashanth @alex-mohr
This commit is contained in:
commit
8c6336b12a
@ -20,10 +20,13 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -60,6 +63,9 @@ type rangeAllocator struct {
|
|||||||
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
||||||
nodeCIDRUpdateChannel chan nodeAndCIDR
|
nodeCIDRUpdateChannel chan nodeAndCIDR
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
|
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
|
||||||
|
sync.Mutex
|
||||||
|
nodesInProcessing sets.String
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
|
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
|
||||||
@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
|||||||
clusterCIDR: clusterCIDR,
|
clusterCIDR: clusterCIDR,
|
||||||
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
|
nodesInProcessing: sets.NewString(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if serviceCIDR != nil {
|
if serviceCIDR != nil {
|
||||||
@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
|||||||
return ra, nil
|
return ra, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
if r.nodesInProcessing.Has(nodeName) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
r.nodesInProcessing.Insert(nodeName)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
r.nodesInProcessing.Delete(nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
|
func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
|
||||||
|
defer r.removeNodeFromProcessing(node.Name)
|
||||||
if node.Spec.PodCIDR == "" {
|
if node.Spec.PodCIDR == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
|
|||||||
|
|
||||||
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
|
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
|
||||||
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
|
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
|
||||||
|
// WARNING: If you're adding any return calls or defer any more work from this function
|
||||||
|
// you have to handle correctly nodesInProcessing.
|
||||||
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
|
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
|
||||||
|
if node == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !r.insertNodeToProcessing(node.Name) {
|
||||||
|
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if node.Spec.PodCIDR != "" {
|
if node.Spec.PodCIDR != "" {
|
||||||
return r.occupyCIDR(node)
|
return r.occupyCIDR(node)
|
||||||
}
|
}
|
||||||
podCIDR, err := r.cidrs.allocateNext()
|
podCIDR, err := r.cidrs.allocateNext()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
r.removeNodeFromProcessing(node.Name)
|
||||||
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
||||||
return fmt.Errorf("failed to allocate cidr: %v", err)
|
return fmt.Errorf("failed to allocate cidr: %v", err)
|
||||||
}
|
}
|
||||||
@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be
|
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
|
||||||
// assignable.
|
// so that they won't be assignable.
|
||||||
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
||||||
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
|
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
|
||||||
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
|
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
|
||||||
@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
|||||||
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
||||||
var err error
|
var err error
|
||||||
var node *api.Node
|
var node *api.Node
|
||||||
|
defer r.removeNodeFromProcessing(data.nodeName)
|
||||||
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
|
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
|
||||||
// TODO: change it to using PATCH instead of full Node updates.
|
// TODO: change it to using PATCH instead of full Node updates.
|
||||||
node, err = r.client.Core().Nodes().Get(data.nodeName)
|
node, err = r.client.Core().Nodes().Get(data.nodeName)
|
||||||
@ -217,9 +252,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
|||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
||||||
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
|
// We accept the fact that we may leek CIDRs here. This is safer than releasing
|
||||||
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
|
// them in case when we don't know if request went through.
|
||||||
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
|
// NodeController restart will return all falsely allocated CIDRs to the pool.
|
||||||
|
if !apierrors.IsServerTimeout(err) {
|
||||||
|
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
|
||||||
|
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
|
||||||
|
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -78,26 +78,13 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *cidrSet) release(cidr *net.IPNet) error {
|
func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
|
||||||
used, err := s.getIndexForCIDR(cidr)
|
begin, end = 0, s.maxCIDRs
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
s.used.SetBit(&s.used, used, 0)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
|
|
||||||
begin, end := 0, s.maxCIDRs
|
|
||||||
cidrMask := cidr.Mask
|
cidrMask := cidr.Mask
|
||||||
maskSize, _ := cidrMask.Size()
|
maskSize, _ := cidrMask.Size()
|
||||||
|
|
||||||
if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) {
|
if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) {
|
||||||
return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
|
return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.clusterMaskSize < maskSize {
|
if s.clusterMaskSize < maskSize {
|
||||||
@ -107,7 +94,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
|
|||||||
Mask: subNetMask,
|
Mask: subNetMask,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return -1, -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ip := make([]byte, 4)
|
ip := make([]byte, 4)
|
||||||
@ -118,9 +105,30 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
|
|||||||
Mask: subNetMask,
|
Mask: subNetMask,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return -1, -1, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return begin, end, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *cidrSet) release(cidr *net.IPNet) error {
|
||||||
|
begin, end, err := s.getBeginingAndEndIndices(cidr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
for i := begin; i <= end; i++ {
|
||||||
|
s.used.SetBit(&s.used, i, 0)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
|
||||||
|
begin, end, err := s.getBeginingAndEndIndices(cidr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
@ -244,6 +244,34 @@ func NewNodeController(
|
|||||||
glog.Errorf("Error allocating CIDR: %v", err)
|
glog.Errorf("Error allocating CIDR: %v", err)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
UpdateFunc: func(_, obj interface{}) {
|
||||||
|
node := obj.(*api.Node)
|
||||||
|
// If the PodCIDR is not empty we either:
|
||||||
|
// - already processed a Node that already had a CIDR after NC restarted
|
||||||
|
// (cidr is marked as used),
|
||||||
|
// - already processed a Node successfully and allocated a CIDR for it
|
||||||
|
// (cidr is marked as used),
|
||||||
|
// - already processed a Node but we did saw a "timeout" response and
|
||||||
|
// request eventually got through in this case we haven't released
|
||||||
|
// the allocated CIDR (cidr is still marked as used).
|
||||||
|
// There's a possible error here:
|
||||||
|
// - NC sees a new Node and assigns a CIDR X to it,
|
||||||
|
// - Update Node call fails with a timeout,
|
||||||
|
// - Node is updated by some other component, NC sees an update and
|
||||||
|
// assigns CIDR Y to the Node,
|
||||||
|
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||||
|
// even though Node sees only CIDR Y
|
||||||
|
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||||
|
// which prevents it from being assigned to any new node. The cluster
|
||||||
|
// state is correct.
|
||||||
|
// Restart of NC fixes the issue.
|
||||||
|
if node.Spec.PodCIDR == "" {
|
||||||
|
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error allocating CIDR: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
DeleteFunc: func(obj interface{}) {
|
DeleteFunc: func(obj interface{}) {
|
||||||
node := obj.(*api.Node)
|
node := obj.(*api.Node)
|
||||||
err := nc.cidrAllocator.ReleaseCIDR(node)
|
err := nc.cidrAllocator.ReleaseCIDR(node)
|
||||||
|
Loading…
Reference in New Issue
Block a user