cidr allocator

This commit is contained in:
mqliang 2016-01-27 11:53:09 +08:00
parent b7a31ad261
commit 69b8453fa0
3 changed files with 457 additions and 54 deletions

View File

@ -0,0 +1,123 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/binary"
"errors"
"fmt"
"math/big"
"net"
"sync"
)
var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
// CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes.
type CIDRAllocator interface {
AllocateNext() (*net.IPNet, error)
Occupy(*net.IPNet) error
Release(*net.IPNet) error
}
type rangeAllocator struct {
clusterIP net.IP
clusterMaskSize int
subNetMaskSize int
maxCIDRs int
used big.Int
lock sync.Mutex
}
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator {
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
ra := &rangeAllocator{
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: subNetMaskSize,
maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize),
}
return ra
}
func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) {
r.lock.Lock()
defer r.lock.Unlock()
nextUnused := -1
for i := 0; i < r.maxCIDRs; i++ {
if r.used.Bit(i) == 0 {
nextUnused = i
break
}
}
if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining
}
r.used.SetBit(&r.used, nextUnused, 1)
j := uint32(nextUnused) << uint32(32-r.subNetMaskSize)
ipInt := (binary.BigEndian.Uint32(r.clusterIP)) | j
ip := make([]byte, 4)
binary.BigEndian.PutUint32(ip, ipInt)
return &net.IPNet{
IP: ip,
Mask: net.CIDRMask(r.subNetMaskSize, 32),
}, nil
}
func (r *rangeAllocator) Release(cidr *net.IPNet) error {
used, err := r.getBitforCIDR(cidr)
if err != nil {
return err
}
r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, used, 0)
return nil
}
func (r *rangeAllocator) Occupy(cidr *net.IPNet) error {
used, err := r.getBitforCIDR(cidr)
if err != nil {
return err
}
r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, used, 1)
return nil
}
func (r *rangeAllocator) getBitforCIDR(cidr *net.IPNet) (int, error) {
used := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize)
if used > uint32(r.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr)
}
return int(used), nil
}

View File

@ -0,0 +1,241 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"github.com/golang/glog"
"net"
"reflect"
"testing"
)
func TestRangeAllocatorFullyAllocated(t *testing.T) {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30")
a := NewCIDRRangeAllocator(clusterCIDR, 30)
p, err := a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p.String() != "127.123.234.0/30" {
t.Fatalf("unexpected allocated cidr: %s", p.String())
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
a.Release(p)
p, err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p.String() != "127.123.234.0/30" {
t.Fatalf("unexpected allocated cidr: %s", p.String())
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
}
func TestRangeAllocator_RandomishAllocation(t *testing.T) {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16")
a := NewCIDRRangeAllocator(clusterCIDR, 24)
// allocate all the CIDRs
var err error
cidrs := make([]*net.IPNet, 256)
for i := 0; i < 256; i++ {
cidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
// release them all
for i := 0; i < 256; i++ {
a.Release(cidrs[i])
}
// allocate the CIDRs again
rcidrs := make([]*net.IPNet, 256)
for i := 0; i < 256; i++ {
rcidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %d, %v", i, err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
if !reflect.DeepEqual(cidrs, rcidrs) {
t.Fatalf("expected re-allocated cidrs are the same collection")
}
}
func TestRangeAllocator_AllocationOccupied(t *testing.T) {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16")
a := NewCIDRRangeAllocator(clusterCIDR, 24)
// allocate all the CIDRs
var err error
cidrs := make([]*net.IPNet, 256)
for i := 0; i < 256; i++ {
cidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
// release them all
for i := 0; i < 256; i++ {
a.Release(cidrs[i])
}
// occupy the last 128 CIDRs
for i := 128; i < 256; i++ {
a.Occupy(cidrs[i])
}
// allocate the first 128 CIDRs again
rcidrs := make([]*net.IPNet, 128)
for i := 0; i < 128; i++ {
rcidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %d, %v", i, err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
// check Occupy() work properly
for i := 128; i < 256; i++ {
rcidrs = append(rcidrs, cidrs[i])
}
if !reflect.DeepEqual(cidrs, rcidrs) {
t.Fatalf("expected re-allocated cidrs are the same collection")
}
}
func TestGetBitforCIDR(t *testing.T) {
cases := []struct {
clusterCIDRStr string
subNetMaskSize int
subNetCIDRStr string
expectedBit int
expectErr bool
}{
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/16",
expectedBit: 0,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.123.0.0/16",
expectedBit: 123,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.168.0.0/16",
expectedBit: 168,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.224.0.0/16",
expectedBit: 224,
expectErr: false,
},
{
clusterCIDRStr: "192.168.0.0/16",
subNetMaskSize: 24,
subNetCIDRStr: "192.168.12.0/24",
expectedBit: 12,
expectErr: false,
},
{
clusterCIDRStr: "192.168.0.0/16",
subNetMaskSize: 24,
subNetCIDRStr: "192.168.151.0/24",
expectedBit: 151,
expectErr: false,
},
{
clusterCIDRStr: "192.168.0.0/16",
subNetMaskSize: 24,
subNetCIDRStr: "127.168.224.0/24",
expectErr: true,
},
}
for _, tc := range cases {
_, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr)
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ra := &rangeAllocator{
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: tc.subNetMaskSize,
maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize),
}
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := ra.getBitforCIDR(subnetCIDR)
if err == nil && tc.expectErr {
glog.Errorf("expected error but got null")
continue
}
if err != nil && !tc.expectErr {
glog.Errorf("unexpected error: %v", err)
continue
}
if got != tc.expectedBit {
glog.Errorf("expected %v, but got %v", tc.expectedBit, got)
}
}
}

View File

@ -57,6 +57,8 @@ var (
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
podCIDRUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
)
@ -121,6 +123,8 @@ type NodeController struct {
// DaemonSet framework and store
daemonSetController *framework.Controller
daemonSetStore cache.StoreToDaemonSetLister
// allocate/recycle CIDRs for node if allocateNodeCIDRs == true
cidrAllocator CIDRAllocator
forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error)
@ -157,8 +161,17 @@ func NewNodeController(
metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
if allocateNodeCIDRs && clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
if allocateNodeCIDRs {
if clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
mask := clusterCIDR.Mask
// TODO(mqliang): Make pod CIDR mask size configurable.
// For now, we assume podCIDR mask size is 24, so make sure the
// clusterCIDR mask size is larger than 24.
if maskSize, _ := mask.Size(); maskSize > 24 {
glog.Fatal("NodeController: Invalid clusterCIDR, mask size must be less than 24.")
}
}
evictorLock := sync.Mutex{}
@ -204,6 +217,15 @@ func NewNodeController(
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{}
if nc.allocateNodeCIDRs {
nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{
AddFunc: nc.allocateOrOccupyCIDR,
DeleteFunc: nc.recycleCIDR,
}
}
nc.nodeStore.Store, nc.nodeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -215,8 +237,9 @@ func NewNodeController(
},
&api.Node{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{},
nodeEventHandlerFuncs,
)
nc.daemonSetStore.Store, nc.daemonSetController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -230,6 +253,12 @@ func NewNodeController(
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{},
)
if allocateNodeCIDRs {
// TODO(mqliang): make pod CIDR mask size configurable, for now set it to 24.
nc.cidrAllocator = NewCIDRRangeAllocator(clusterCIDR, 24)
}
return nc
}
@ -305,17 +334,68 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
}
// Generates num pod CIDRs that could be assigned to nodes.
func generateCIDRs(clusterCIDR *net.IPNet, num int) sets.String {
res := sets.NewString()
cidrIP := clusterCIDR.IP.To4()
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
b1 := byte(i >> 8)
b2 := byte(i % 256)
res.Insert(fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], cidrIP[1]+b1, cidrIP[2]+b2))
// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
node := obj.(*api.Node)
if node.Spec.PodCIDR != "" {
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
return
}
if err := nc.cidrAllocator.Occupy(podCIDR); err != nil {
glog.Errorf("failed to mark cidr as occupied :%v", err)
return
}
return
}
podCIDR, err := nc.cidrAllocator.AllocateNext()
if err != nil {
nc.recordNodeStatusChange(node, "CIDRNotAvailable")
return
}
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
node.Spec.PodCIDR = podCIDR.String()
if _, err := nc.kubeClient.Core().Nodes().Update(node); err == nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR : %v", err)
break
}
node, err = nc.kubeClient.Core().Nodes().Get(node.Name)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", node.Name, err)
break
}
}
if err != nil {
glog.Errorf("Update PodCIDR of node %v from NodeController exceeds retry count.", node.Name)
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v", node.Name, err)
}
}
// recycleCIDR recycles the CIDR of a removed node
func (nc *NodeController) recycleCIDR(obj interface{}) {
node := obj.(*api.Node)
if node.Spec.PodCIDR == "" {
return
}
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
return
}
glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR)
if err := nc.cidrAllocator.Release(podCIDR); err != nil {
glog.Errorf("failed to release cidr: %v", err)
}
return res
}
// getCondition returns a condition object for the specific condition
@ -450,11 +530,6 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}
if nc.allocateNodeCIDRs {
// TODO (cjcullen): Use pkg/controller/framework to watch nodes and
// reduce lists/decouple this from monitoring status.
nc.reconcileNodeCIDRs(nodes)
}
seenReady := false
for i := range nodes.Items {
var gracePeriod time.Duration
@ -590,42 +665,6 @@ func (nc *NodeController) forcefullyDeleteNode(nodeName string) error {
return nil
}
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
// if it doesn't currently have one.
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
glog.V(4).Infof("Reconciling cidrs for %d nodes", len(nodes.Items))
// TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs
// on each sync period?
availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items))
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name)
availableCIDRs.Delete(node.Spec.PodCIDR)
}
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR == "" {
// Re-GET node (because ours might be stale by now).
n, err := nc.kubeClient.Core().Nodes().Get(node.Name)
if err != nil {
glog.Errorf("Failed to get node %q: %v", node.Name, err)
continue
}
podCIDR, found := availableCIDRs.PopAny()
if !found {
nc.recordNodeStatusChange(n, "CIDRNotAvailable")
continue
}
glog.V(1).Infof("Assigning node %s CIDR %s", n.Name, podCIDR)
n.Spec.PodCIDR = podCIDR
if _, err := nc.kubeClient.Core().Nodes().Update(n); err != nil {
nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")
}
}
}
}
func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event string) {
ref := &api.ObjectReference{
Kind: "Node",