diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/cidr_allocator.go index 1555bfc71ce..2bf1ac08cdc 100644 --- a/pkg/controller/node/cidr_allocator.go +++ b/pkg/controller/node/cidr_allocator.go @@ -20,10 +20,13 @@ import ( "errors" "fmt" "net" + "sync" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" @@ -60,6 +63,9 @@ type rangeAllocator struct { // This increases a throughput of CIDR assignment by not blocking on long operations. nodeCIDRUpdateChannel chan nodeAndCIDR recorder record.EventRecorder + // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation + sync.Mutex + nodesInProcessing sets.String } // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node @@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s clusterCIDR: clusterCIDR, nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), recorder: recorder, + nodesInProcessing: sets.NewString(), } if serviceCIDR != nil { @@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s return ra, nil } +func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { + r.Lock() + defer r.Unlock() + if r.nodesInProcessing.Has(nodeName) { + return false + } + r.nodesInProcessing.Insert(nodeName) + return true +} + +func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { + r.Lock() + defer r.Unlock() + r.nodesInProcessing.Delete(nodeName) +} + func (r *rangeAllocator) occupyCIDR(node *api.Node) error { + defer r.removeNodeFromProcessing(node.Name) if node.Spec.PodCIDR == "" { return nil } @@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error { // AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR // if it doesn't currently have one or mark the CIDR as used if the node already have one. +// WARNING: If you're adding any return calls or defer any more work from this function +// you have to handle correctly nodesInProcessing. func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error { + if node == nil { + return nil + } + if !r.insertNodeToProcessing(node.Name) { + glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name) + return nil + } if node.Spec.PodCIDR != "" { return r.occupyCIDR(node) } podCIDR, err := r.cidrs.allocateNext() if err != nil { + r.removeNodeFromProcessing(node.Name) recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } @@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error { return err } -// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be -// assignable. +// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, +// so that they won't be assignable. func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { // Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either // clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR) @@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { var err error var node *api.Node + defer r.removeNodeFromProcessing(data.nodeName) for rep := 0; rep < podCIDRUpdateRetry; rep++ { // TODO: change it to using PATCH instead of full Node updates. node, err = r.client.Core().Nodes().Get(data.nodeName) @@ -217,9 +252,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } if err != nil { recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") - glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) - if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { - glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) + // We accept the fact that we may leek CIDRs here. This is safer than releasing + // them in case when we don't know if request went through. + // NodeController restart will return all falsely allocated CIDRs to the pool. + if !apierrors.IsServerTimeout(err) { + glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) + if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { + glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) + } } } return err diff --git a/pkg/controller/node/cidr_set.go b/pkg/controller/node/cidr_set.go index 353c4f0161c..c4d9aaa8900 100644 --- a/pkg/controller/node/cidr_set.go +++ b/pkg/controller/node/cidr_set.go @@ -78,26 +78,13 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) { }, nil } -func (s *cidrSet) release(cidr *net.IPNet) error { - used, err := s.getIndexForCIDR(cidr) - if err != nil { - return err - } - - s.Lock() - defer s.Unlock() - s.used.SetBit(&s.used, used, 0) - - return nil -} - -func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { - begin, end := 0, s.maxCIDRs +func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) { + begin, end = 0, s.maxCIDRs cidrMask := cidr.Mask maskSize, _ := cidrMask.Size() if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) { - return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) + return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) } if s.clusterMaskSize < maskSize { @@ -107,7 +94,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { Mask: subNetMask, }) if err != nil { - return err + return -1, -1, err } ip := make([]byte, 4) @@ -118,9 +105,30 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { Mask: subNetMask, }) if err != nil { - return err + return -1, -1, err } } + return begin, end, nil +} + +func (s *cidrSet) release(cidr *net.IPNet) error { + begin, end, err := s.getBeginingAndEndIndices(cidr) + if err != nil { + return err + } + s.Lock() + defer s.Unlock() + for i := begin; i <= end; i++ { + s.used.SetBit(&s.used, i, 0) + } + return nil +} + +func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { + begin, end, err := s.getBeginingAndEndIndices(cidr) + if err != nil { + return err + } s.Lock() defer s.Unlock() diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 7fded001aad..b7a96fc4989 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -244,6 +244,34 @@ func NewNodeController( glog.Errorf("Error allocating CIDR: %v", err) } }, + UpdateFunc: func(_, obj interface{}) { + node := obj.(*api.Node) + // If the PodCIDR is not empty we either: + // - already processed a Node that already had a CIDR after NC restarted + // (cidr is marked as used), + // - already processed a Node successfully and allocated a CIDR for it + // (cidr is marked as used), + // - already processed a Node but we did saw a "timeout" response and + // request eventually got through in this case we haven't released + // the allocated CIDR (cidr is still marked as used). + // There's a possible error here: + // - NC sees a new Node and assigns a CIDR X to it, + // - Update Node call fails with a timeout, + // - Node is updated by some other component, NC sees an update and + // assigns CIDR Y to the Node, + // - Both CIDR X and CIDR Y are marked as used in the local cache, + // even though Node sees only CIDR Y + // The problem here is that in in-memory cache we see CIDR X as marked, + // which prevents it from being assigned to any new node. The cluster + // state is correct. + // Restart of NC fixes the issue. + if node.Spec.PodCIDR == "" { + err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) + if err != nil { + glog.Errorf("Error allocating CIDR: %v", err) + } + } + }, DeleteFunc: func(obj interface{}) { node := obj.(*api.Node) err := nc.cidrAllocator.ReleaseCIDR(node)