Don't allow node controller to allocate into service CIDR range

This commit is contained in:
mqliang
2016-05-16 18:57:44 +08:00
parent 69b8453fa0
commit cf7a3475f3
11 changed files with 189 additions and 22 deletions

View File

@@ -35,6 +35,7 @@ type CIDRAllocator interface {
}
type rangeAllocator struct {
clusterCIDR *net.IPNet
clusterIP net.IP
clusterMaskSize int
subNetMaskSize int
@@ -50,6 +51,7 @@ func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAlloc
clusterMaskSize, _ := clusterMask.Size()
ra := &rangeAllocator{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: subNetMaskSize,
@@ -87,7 +89,7 @@ func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) {
}
func (r *rangeAllocator) Release(cidr *net.IPNet) error {
used, err := r.getBitforCIDR(cidr)
used, err := r.getIndexForCIDR(cidr)
if err != nil {
return err
}
@@ -99,25 +101,54 @@ func (r *rangeAllocator) Release(cidr *net.IPNet) error {
return nil
}
func (r *rangeAllocator) Occupy(cidr *net.IPNet) error {
used, err := r.getBitforCIDR(cidr)
if err != nil {
return err
func (r *rangeAllocator) MaxCIDRs() int {
return r.maxCIDRs
}
func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) {
begin, end := 0, r.maxCIDRs
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
if r.clusterCIDR.Contains(cidr.IP.Mask(r.clusterCIDR.Mask)) && r.clusterMaskSize < maskSize {
subNetMask := net.CIDRMask(r.subNetMaskSize, 32)
begin, err = r.getIndexForCIDR(&net.IPNet{
IP: cidr.IP.To4().Mask(subNetMask),
Mask: subNetMask,
})
if err != nil {
return err
}
ip := make([]byte, 4)
ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask))
binary.BigEndian.PutUint32(ip, ipInt)
end, err = r.getIndexForCIDR(&net.IPNet{
IP: net.IP(ip).To4().Mask(subNetMask),
Mask: subNetMask,
})
if err != nil {
return err
}
}
r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, used, 1)
for i := begin; i <= end; i++ {
r.used.SetBit(&r.used, i, 1)
}
return nil
}
func (r *rangeAllocator) getBitforCIDR(cidr *net.IPNet) (int, error) {
used := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize)
func (r *rangeAllocator) getIndexForCIDR(cidr *net.IPNet) (int, error) {
cidrIndex := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize)
if used > uint32(r.maxCIDRs) {
if cidrIndex >= uint32(r.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr)
}
return int(used), nil
return int(cidrIndex), nil
}

View File

@@ -18,6 +18,7 @@ package node
import (
"github.com/golang/glog"
"math/big"
"net"
"reflect"
"testing"
@@ -223,7 +224,7 @@ func TestGetBitforCIDR(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
got, err := ra.getBitforCIDR(subnetCIDR)
got, err := ra.getIndexForCIDR(subnetCIDR)
if err == nil && tc.expectErr {
glog.Errorf("expected error but got null")
continue
@@ -239,3 +240,110 @@ func TestGetBitforCIDR(t *testing.T) {
}
}
}
func TestOccupy(t *testing.T) {
cases := []struct {
clusterCIDRStr string
subNetMaskSize int
subNetCIDRStr string
expectedUsedBegin int
expectedUsedEnd int
expectErr bool
}{
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/8",
expectedUsedBegin: 0,
expectedUsedEnd: 256,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/2",
expectedUsedBegin: 0,
expectedUsedEnd: 256,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/16",
expectedUsedBegin: 0,
expectedUsedEnd: 0,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 32,
subNetCIDRStr: "127.0.0.0/16",
expectedUsedBegin: 0,
expectedUsedEnd: 65535,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/7",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/15",
expectedUsedBegin: 256,
expectedUsedEnd: 257,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/7",
subNetMaskSize: 15,
subNetCIDRStr: "127.0.0.0/15",
expectedUsedBegin: 128,
expectedUsedEnd: 128,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/7",
subNetMaskSize: 18,
subNetCIDRStr: "127.0.0.0/15",
expectedUsedBegin: 1024,
expectedUsedEnd: 1031,
expectErr: false,
},
}
for _, tc := range cases {
_, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
ra := &rangeAllocator{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: tc.subNetMaskSize,
maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize),
}
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = ra.Occupy(subnetCIDR)
if err == nil && tc.expectErr {
t.Errorf("expected error but got none")
continue
}
if err != nil && !tc.expectErr {
t.Errorf("unexpected error: %v", err)
}
expectedUsed := big.Int{}
for i := tc.expectedUsedBegin; i <= tc.expectedUsedEnd; i++ {
expectedUsed.SetBit(&expectedUsed, i, 1)
}
if expectedUsed.Cmp(&ra.used) != 0 {
t.Errorf("error")
}
}
}

View File

@@ -73,6 +73,7 @@ type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String
kubeClient clientset.Interface
@@ -146,6 +147,7 @@ func NewNodeController(
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
allocateNodeCIDRs bool) *NodeController {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
@@ -192,6 +194,7 @@ func NewNodeController(
lookupIP: net.LookupIP,
now: unversioned.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
@@ -264,6 +267,9 @@ func NewNodeController(
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
nc.filterOutServiceRange()
go nc.nodeController.Run(wait.NeverStop)
go nc.podController.Run(wait.NeverStop)
go nc.daemonSetController.Run(wait.NeverStop)
@@ -334,6 +340,16 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
}
func (nc *NodeController) filterOutServiceRange() {
if !nc.clusterCIDR.Contains(nc.serviceCIDR.IP.Mask(nc.clusterCIDR.Mask)) {
return
}
if err := nc.cidrAllocator.Occupy(nc.serviceCIDR); err != nil {
glog.Errorf("Error filtering out service cidr: %v", err)
}
}
// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {

View File

@@ -660,7 +660,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
nodeController.daemonSetStore.Add(&ds)
@@ -731,7 +731,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, false)
testNodeMonitorPeriod, nil, nil, false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() unversioned.Time { return unversioned.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.nodeExistsInCloudProvider = func(nodeName string) (bool, error) {
@@ -963,7 +963,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@@ -1113,7 +1113,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
@@ -1195,7 +1195,7 @@ func TestNodeDeletion(t *testing.T) {
}
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@@ -1298,7 +1298,7 @@ func TestCheckPod(t *testing.T) {
},
}
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
@@ -1375,7 +1375,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"),
newPod("c", "gone"),
}
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, false)
nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))