diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index e2c10325007..6e97c6c563a 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -20,6 +20,8 @@ import ( "errors" "fmt" "net" + "strconv" + "strings" "sync" "time" @@ -111,6 +113,12 @@ type NodeController struct { nodeStore cache.StoreToNodeLister forcefullyDeletePod func(*api.Pod) + availableCIDRs sets.Int + // Calculate the maximum num of CIDRs we could give out based on nc.clusterCIDR + // The flag denoting if the node controller is newly started or restarted(after crash) + needSync bool + maxCIDRs int + generatedCIDR bool } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -158,6 +166,10 @@ func NewNodeController( clusterCIDR: clusterCIDR, allocateNodeCIDRs: allocateNodeCIDRs, forcefullyDeletePod: func(p *api.Pod) { forcefullyDeletePod(kubeClient, p) }, + availableCIDRs: make(sets.Int), + needSync: true, + maxCIDRs: 0, + generatedCIDR: false, } nc.podStore.Store, nc.podController = framework.NewInformer( @@ -192,10 +204,26 @@ func NewNodeController( return nc } +// generateAvailabeCIDRs does generating new Available CIDRs index and insert +// them into availableCIDRs set. Everytime it will generate 256 new CIDRs, +// once there are no more CIDRs in the network, return false +func (nc *NodeController) generateAvailableCIDRs() { + nc.generatedCIDR = true + // Generate all available CIDRs here, since there will not be manay + // available CIDRs. Set will be small, it will use less than 1MB memory + cidrIP := nc.clusterCIDR.IP.To4() + nc.maxCIDRs = (256-int(cidrIP[1]))*256 - int(cidrIP[2]) + + for i := 0; i <= nc.maxCIDRs; i++ { + nc.availableCIDRs.Insert(i) + } +} + // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { go nc.nodeController.Run(util.NeverStop) go nc.podController.Run(util.NeverStop) + // Incorporate the results of node status pushed from kubelet to master. go util.Until(func() { if err := nc.monitorNodeStatus(); err != nil { @@ -260,19 +288,34 @@ func (nc *NodeController) Run(period time.Duration) { }, nodeEvictionPeriod, util.NeverStop) } -// Generates num pod CIDRs that could be assigned to nodes. -func generateCIDRs(clusterCIDR *net.IPNet, num int) sets.String { - res := sets.NewString() +// translateCIDRs translates pod CIDR index to the CIDR that could be +// assigned to node. It will also check for overflow which make sure CIDR is valid +func translateCIDRs(clusterCIDR *net.IPNet, num int) string { cidrIP := clusterCIDR.IP.To4() - for i := 0; i < num; i++ { - // TODO: Make the CIDRs configurable. - b1 := byte(i >> 8) - b2 := byte(i % 256) - res.Insert(fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], cidrIP[1]+b1, cidrIP[2]+b2)) + // TODO: Make the CIDRs configurable. + b1 := (num / 256) + int(cidrIP[1]) + b2 := (num % 256) + int(cidrIP[2]) + if b2 > 255 { + b2 = b2 % 256 + b1 = b1 + 1 } + res := fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], b1, b2) return res } +// translateCIDRtoIndex does translating CIDR to index of CIDR +func (nc *NodeController) translateCIDRtoIndex(freeCIDR string) int { + CIDRsArray := strings.Split(freeCIDR, ".") + if len(CIDRsArray) < 3 { + return -1 + } + cidrIP := nc.clusterCIDR.IP.To4() + CIDRsIndexOne, _ := strconv.Atoi(CIDRsArray[1]) + CIDRsIndexTwo, _ := strconv.Atoi(CIDRsArray[2]) + release := (CIDRsIndexOne-int(cidrIP[1]))*256 + CIDRsIndexTwo - int(cidrIP[2]) + return release +} + // getCondition returns a condition object for the specific condition // type, nil if the condition is not set. func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { @@ -350,6 +393,17 @@ func forcefullyDeletePod(c client.Interface, pod *api.Pod) { } } +// releaseCIDR does translating CIDR back to CIDR index and insert this index +// back to availableCIDRs set +func (nc *NodeController) releaseCIDR(freeCIDR string) { + release := nc.translateCIDRtoIndex(freeCIDR) + if release >= 0 && release <= nc.maxCIDRs { + nc.availableCIDRs.Insert(release) + } else { + glog.V(4).Info("CIDR %s is invalid", freeCIDR) + } +} + // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, // post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or // not reachable for a long period of time. @@ -460,11 +514,14 @@ func (nc *NodeController) monitorNodeStatus() error { nc.evictPods(node.Name) continue } - - if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil { + assignedCIDR := node.Spec.PodCIDR + if err = nc.kubeClient.Nodes().Delete(node.Name); err != nil { glog.Errorf("Unable to delete node %s: %v", node.Name, err) continue } + if assignedCIDR != "" { + nc.releaseCIDR(assignedCIDR) + } } } } @@ -474,25 +531,43 @@ func (nc *NodeController) monitorNodeStatus() error { // reconcileNodeCIDRs looks at each node and assigns it a valid CIDR // if it doesn't currently have one. +// Examines the availableCIDRs set first, if no more CIDR in it, generate more. func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) { glog.V(4).Infof("Reconciling cidrs for %d nodes", len(nodes.Items)) - // TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs - // on each sync period? - availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items)) - for _, node := range nodes.Items { - if node.Spec.PodCIDR != "" { - glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name) - availableCIDRs.Delete(node.Spec.PodCIDR) + // check if the this node controller is restarted because of crash + // this will only be ran once when the controller being restarted(because of crashed) or newly start + if nc.needSync { + // if it is crashed, restore the availableCIDRs by generating CIDRs, insert them into availableCIDRs set + // and delete assigned CIDRs from the the availableCIDRs set + for _, node := range nodes.Items { + if node.Spec.PodCIDR != "" { + if nc.availableCIDRs.Has(nc.translateCIDRtoIndex(node.Spec.PodCIDR)) { + nc.availableCIDRs.Delete(nc.translateCIDRtoIndex(node.Spec.PodCIDR)) + } else { + glog.V(4).Info("Node %s CIDR error, its CIDR is invalid, will reassign CIDR", node.Name) + node.Spec.PodCIDR = "" + if _, err := nc.kubeClient.Nodes().Update(&node); err != nil { + nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed") + } + break + } + } } + nc.needSync = false } for _, node := range nodes.Items { if node.Spec.PodCIDR == "" { - podCIDR, found := availableCIDRs.PopAny() + CIDRsNum, found := nc.availableCIDRs.PopAny() + if !found && !nc.generatedCIDR { + nc.generateAvailableCIDRs() + CIDRsNum, found = nc.availableCIDRs.PopAny() + } if !found { nc.recordNodeStatusChange(&node, "CIDRNotAvailable") continue } - glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR) + podCIDR := translateCIDRs(nc.clusterCIDR, CIDRsNum) + glog.V(4).Info("Assigning node %s CIDR %s", node.Name, podCIDR) node.Spec.PodCIDR = podCIDR if _, err := nc.kubeClient.Nodes().Update(&node); err != nil { nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")