mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Move CIDR allocation logic away from nodecontroller.go
This commit is contained in:
@@ -57,13 +57,8 @@ var (
|
||||
const (
|
||||
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
|
||||
nodeStatusUpdateRetry = 5
|
||||
// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
|
||||
podCIDRUpdateRetry = 5
|
||||
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
|
||||
nodeEvictionPeriod = 100 * time.Millisecond
|
||||
// controls how many NodeSpec updates NC can process in any moment.
|
||||
cidrUpdateWorkers = 10
|
||||
cidrUpdateQueueSize = 5000
|
||||
)
|
||||
|
||||
type nodeStatusData struct {
|
||||
@@ -72,11 +67,6 @@ type nodeStatusData struct {
|
||||
status api.NodeStatus
|
||||
}
|
||||
|
||||
type nodeAndCIDR struct {
|
||||
nodeName string
|
||||
cidr *net.IPNet
|
||||
}
|
||||
|
||||
type NodeController struct {
|
||||
allocateNodeCIDRs bool
|
||||
cloud cloudprovider.Interface
|
||||
@@ -142,8 +132,6 @@ type NodeController struct {
|
||||
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled
|
||||
// when NC sees any healthy Node. This is a temporary fix for v1.3.
|
||||
networkSegmentationMode bool
|
||||
|
||||
nodeCIDRUpdateChannel chan nodeAndCIDR
|
||||
}
|
||||
|
||||
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
||||
@@ -206,7 +194,6 @@ func NewNodeController(
|
||||
allocateNodeCIDRs: allocateNodeCIDRs,
|
||||
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
|
||||
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
|
||||
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
||||
}
|
||||
|
||||
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
|
||||
@@ -233,8 +220,20 @@ func NewNodeController(
|
||||
nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{}
|
||||
if nc.allocateNodeCIDRs {
|
||||
nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: nc.allocateOrOccupyCIDR,
|
||||
DeleteFunc: nc.recycleCIDR,
|
||||
AddFunc: func(obj interface{}) {
|
||||
node := obj.(*api.Node)
|
||||
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
|
||||
if err != nil {
|
||||
glog.Errorf("Error allocating CIDR: %v", err)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
node := obj.(*api.Node)
|
||||
err := nc.cidrAllocator.ReleaseCIDR(node)
|
||||
if err != nil {
|
||||
glog.Errorf("Error releasing CIDR: %v", err)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,7 +266,7 @@ func NewNodeController(
|
||||
)
|
||||
|
||||
if allocateNodeCIDRs {
|
||||
nc.cidrAllocator = NewCIDRRangeAllocator(clusterCIDR, nodeCIDRMaskSize)
|
||||
nc.cidrAllocator = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
|
||||
}
|
||||
|
||||
return nc
|
||||
@@ -275,14 +274,6 @@ func NewNodeController(
|
||||
|
||||
// Run starts an asynchronous loop that monitors the status of cluster nodes.
|
||||
func (nc *NodeController) Run(period time.Duration) {
|
||||
if nc.allocateNodeCIDRs {
|
||||
if nc.serviceCIDR != nil {
|
||||
nc.filterOutServiceRange()
|
||||
} else {
|
||||
glog.Info("No Service CIDR provided. Skipping filtering out service addresses.")
|
||||
}
|
||||
}
|
||||
|
||||
go nc.nodeController.Run(wait.NeverStop)
|
||||
go nc.podController.Run(wait.NeverStop)
|
||||
go nc.daemonSetController.Run(wait.NeverStop)
|
||||
@@ -351,107 +342,6 @@ func (nc *NodeController) Run(period time.Duration) {
|
||||
}, nodeEvictionPeriod, wait.NeverStop)
|
||||
|
||||
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
|
||||
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
go func(stopChan <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case workItem, ok := <-nc.nodeCIDRUpdateChannel:
|
||||
if !ok {
|
||||
glog.Warning("NodeCIDRUpdateChannel read returned false.")
|
||||
return
|
||||
}
|
||||
nc.updateCIDRAllocation(workItem)
|
||||
case <-stopChan:
|
||||
glog.V(0).Info("StopChannel is closed.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}(wait.NeverStop)
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) filterOutServiceRange() {
|
||||
if !nc.clusterCIDR.Contains(nc.serviceCIDR.IP.Mask(nc.clusterCIDR.Mask)) && !nc.serviceCIDR.Contains(nc.clusterCIDR.IP.Mask(nc.serviceCIDR.Mask)) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := nc.cidrAllocator.Occupy(nc.serviceCIDR); err != nil {
|
||||
glog.Errorf("Error filtering out service cidr: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) updateCIDRAllocation(data nodeAndCIDR) {
|
||||
var err error
|
||||
var node *api.Node
|
||||
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
|
||||
node, err = nc.kubeClient.Core().Nodes().Get(data.nodeName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
|
||||
continue
|
||||
}
|
||||
node.Spec.PodCIDR = data.cidr.String()
|
||||
if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil {
|
||||
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
|
||||
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
|
||||
err := nc.cidrAllocator.Release(data.cidr)
|
||||
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR
|
||||
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
|
||||
func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
|
||||
node := obj.(*api.Node)
|
||||
if node.Spec.PodCIDR != "" {
|
||||
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
|
||||
return
|
||||
}
|
||||
if err := nc.cidrAllocator.Occupy(podCIDR); err != nil {
|
||||
glog.Errorf("failed to mark cidr as occupied :%v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
podCIDR, err := nc.cidrAllocator.AllocateNext()
|
||||
if err != nil {
|
||||
nc.recordNodeStatusChange(node, "CIDRNotAvailable")
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
|
||||
nc.nodeCIDRUpdateChannel <- nodeAndCIDR{
|
||||
nodeName: node.Name,
|
||||
cidr: podCIDR,
|
||||
}
|
||||
}
|
||||
|
||||
// recycleCIDR recycles the CIDR of a removed node
|
||||
func (nc *NodeController) recycleCIDR(obj interface{}) {
|
||||
node := obj.(*api.Node)
|
||||
|
||||
if node.Spec.PodCIDR == "" {
|
||||
return
|
||||
}
|
||||
|
||||
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR)
|
||||
if err := nc.cidrAllocator.Release(podCIDR); err != nil {
|
||||
glog.Errorf("failed to release cidr: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var gracefulDeletionVersion = version.MustParse("v1.1.0")
|
||||
@@ -625,7 +515,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
||||
|
||||
// Report node event.
|
||||
if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
|
||||
nc.recordNodeStatusChange(node, "NodeNotReady")
|
||||
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
|
||||
if err = nc.markAllPodsNotReady(node.Name); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
|
||||
}
|
||||
@@ -721,7 +611,7 @@ func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event str
|
||||
nc.recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
|
||||
}
|
||||
|
||||
func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status string) {
|
||||
func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
|
||||
ref := &api.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: node.Name,
|
||||
@@ -731,7 +621,7 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri
|
||||
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
|
||||
// TODO: This requires a transaction, either both node status is updated
|
||||
// and event is recorded or neither should happen, see issue #6055.
|
||||
nc.recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
|
||||
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
|
||||
}
|
||||
|
||||
// For a given node checks its conditions and tries to update it. Returns grace period to which given node
|
||||
|
||||
Reference in New Issue
Block a user