From 69b8453fa08d0c9b38fd5658f7bfd18d9c644f98 Mon Sep 17 00:00:00 2001 From: mqliang Date: Wed, 27 Jan 2016 11:53:09 +0800 Subject: [PATCH] cidr allocator --- pkg/controller/node/cidr_allocator.go | 123 +++++++++++ pkg/controller/node/cidr_allocator_test.go | 241 +++++++++++++++++++++ pkg/controller/node/nodecontroller.go | 147 ++++++++----- 3 files changed, 457 insertions(+), 54 deletions(-) create mode 100644 pkg/controller/node/cidr_allocator.go create mode 100644 pkg/controller/node/cidr_allocator_test.go diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/cidr_allocator.go new file mode 100644 index 00000000000..2dac42da7a9 --- /dev/null +++ b/pkg/controller/node/cidr_allocator.go @@ -0,0 +1,123 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "encoding/binary" + "errors" + "fmt" + "math/big" + "net" + "sync" +) + +var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") + +// CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes. +type CIDRAllocator interface { + AllocateNext() (*net.IPNet, error) + Occupy(*net.IPNet) error + Release(*net.IPNet) error +} + +type rangeAllocator struct { + clusterIP net.IP + clusterMaskSize int + subNetMaskSize int + maxCIDRs int + used big.Int + lock sync.Mutex +} + +// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node +// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. +func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator { + clusterMask := clusterCIDR.Mask + clusterMaskSize, _ := clusterMask.Size() + + ra := &rangeAllocator{ + clusterIP: clusterCIDR.IP.To4(), + clusterMaskSize: clusterMaskSize, + subNetMaskSize: subNetMaskSize, + maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize), + } + return ra +} + +func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) { + r.lock.Lock() + defer r.lock.Unlock() + + nextUnused := -1 + for i := 0; i < r.maxCIDRs; i++ { + if r.used.Bit(i) == 0 { + nextUnused = i + break + } + } + if nextUnused == -1 { + return nil, errCIDRRangeNoCIDRsRemaining + } + + r.used.SetBit(&r.used, nextUnused, 1) + + j := uint32(nextUnused) << uint32(32-r.subNetMaskSize) + ipInt := (binary.BigEndian.Uint32(r.clusterIP)) | j + ip := make([]byte, 4) + binary.BigEndian.PutUint32(ip, ipInt) + + return &net.IPNet{ + IP: ip, + Mask: net.CIDRMask(r.subNetMaskSize, 32), + }, nil +} + +func (r *rangeAllocator) Release(cidr *net.IPNet) error { + used, err := r.getBitforCIDR(cidr) + if err != nil { + return err + } + + r.lock.Lock() + defer r.lock.Unlock() + r.used.SetBit(&r.used, used, 0) + + return nil +} + +func (r *rangeAllocator) Occupy(cidr *net.IPNet) error { + used, err := r.getBitforCIDR(cidr) + if err != nil { + return err + } + + r.lock.Lock() + defer r.lock.Unlock() + r.used.SetBit(&r.used, used, 1) + + return nil +} + +func (r *rangeAllocator) getBitforCIDR(cidr *net.IPNet) (int, error) { + used := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize) + + if used > uint32(r.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) + } + + return int(used), nil +} diff --git a/pkg/controller/node/cidr_allocator_test.go b/pkg/controller/node/cidr_allocator_test.go new file mode 100644 index 00000000000..cf1cfe451a7 --- /dev/null +++ b/pkg/controller/node/cidr_allocator_test.go @@ -0,0 +1,241 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "github.com/golang/glog" + "net" + "reflect" + "testing" +) + +func TestRangeAllocatorFullyAllocated(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30") + a := NewCIDRRangeAllocator(clusterCIDR, 30) + p, err := a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.String() != "127.123.234.0/30" { + t.Fatalf("unexpected allocated cidr: %s", p.String()) + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + a.Release(p) + p, err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.String() != "127.123.234.0/30" { + t.Fatalf("unexpected allocated cidr: %s", p.String()) + } + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } +} + +func TestRangeAllocator_RandomishAllocation(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") + a := NewCIDRRangeAllocator(clusterCIDR, 24) + + // allocate all the CIDRs + var err error + cidrs := make([]*net.IPNet, 256) + + for i := 0; i < 256; i++ { + cidrs[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + // release them all + for i := 0; i < 256; i++ { + a.Release(cidrs[i]) + } + + // allocate the CIDRs again + rcidrs := make([]*net.IPNet, 256) + for i := 0; i < 256; i++ { + rcidrs[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %d, %v", i, err) + } + } + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + if !reflect.DeepEqual(cidrs, rcidrs) { + t.Fatalf("expected re-allocated cidrs are the same collection") + } +} + +func TestRangeAllocator_AllocationOccupied(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") + a := NewCIDRRangeAllocator(clusterCIDR, 24) + + // allocate all the CIDRs + var err error + cidrs := make([]*net.IPNet, 256) + + for i := 0; i < 256; i++ { + cidrs[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + // release them all + for i := 0; i < 256; i++ { + a.Release(cidrs[i]) + } + // occupy the last 128 CIDRs + for i := 128; i < 256; i++ { + a.Occupy(cidrs[i]) + } + + // allocate the first 128 CIDRs again + rcidrs := make([]*net.IPNet, 128) + for i := 0; i < 128; i++ { + rcidrs[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %d, %v", i, err) + } + } + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + // check Occupy() work properly + for i := 128; i < 256; i++ { + rcidrs = append(rcidrs, cidrs[i]) + } + if !reflect.DeepEqual(cidrs, rcidrs) { + t.Fatalf("expected re-allocated cidrs are the same collection") + } +} + +func TestGetBitforCIDR(t *testing.T) { + cases := []struct { + clusterCIDRStr string + subNetMaskSize int + subNetCIDRStr string + expectedBit int + expectErr bool + }{ + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/16", + expectedBit: 0, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.123.0.0/16", + expectedBit: 123, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.168.0.0/16", + expectedBit: 168, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.224.0.0/16", + expectedBit: 224, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "192.168.12.0/24", + expectedBit: 12, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "192.168.151.0/24", + expectedBit: 151, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "127.168.224.0/24", + expectErr: true, + }, + } + + for _, tc := range cases { + _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) + clusterMask := clusterCIDR.Mask + clusterMaskSize, _ := clusterMask.Size() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ra := &rangeAllocator{ + clusterIP: clusterCIDR.IP.To4(), + clusterMaskSize: clusterMaskSize, + subNetMaskSize: tc.subNetMaskSize, + maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize), + } + + _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got, err := ra.getBitforCIDR(subnetCIDR) + if err == nil && tc.expectErr { + glog.Errorf("expected error but got null") + continue + } + + if err != nil && !tc.expectErr { + glog.Errorf("unexpected error: %v", err) + continue + } + + if got != tc.expectedBit { + glog.Errorf("expected %v, but got %v", tc.expectedBit, got) + } + } +} diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 9f65749a42c..9432d6d6a85 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -57,6 +57,8 @@ var ( const ( // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. nodeStatusUpdateRetry = 5 + // podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update. + podCIDRUpdateRetry = 5 // controls how often NodeController will try to evict Pods from non-responsive Nodes. nodeEvictionPeriod = 100 * time.Millisecond ) @@ -121,6 +123,8 @@ type NodeController struct { // DaemonSet framework and store daemonSetController *framework.Controller daemonSetStore cache.StoreToDaemonSetLister + // allocate/recycle CIDRs for node if allocateNodeCIDRs == true + cidrAllocator CIDRAllocator forcefullyDeletePod func(*api.Pod) error nodeExistsInCloudProvider func(string) (bool, error) @@ -157,8 +161,17 @@ func NewNodeController( metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) } - if allocateNodeCIDRs && clusterCIDR == nil { - glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") + if allocateNodeCIDRs { + if clusterCIDR == nil { + glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") + } + mask := clusterCIDR.Mask + // TODO(mqliang): Make pod CIDR mask size configurable. + // For now, we assume podCIDR mask size is 24, so make sure the + // clusterCIDR mask size is larger than 24. + if maskSize, _ := mask.Size(); maskSize > 24 { + glog.Fatal("NodeController: Invalid clusterCIDR, mask size must be less than 24.") + } } evictorLock := sync.Mutex{} @@ -204,6 +217,15 @@ func NewNodeController( // they'll get the benefits they expect. It will also reserve the name for future refactorings. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + + nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{} + if nc.allocateNodeCIDRs { + nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{ + AddFunc: nc.allocateOrOccupyCIDR, + DeleteFunc: nc.recycleCIDR, + } + } + nc.nodeStore.Store, nc.nodeController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -215,8 +237,9 @@ func NewNodeController( }, &api.Node{}, controller.NoResyncPeriodFunc(), - framework.ResourceEventHandlerFuncs{}, + nodeEventHandlerFuncs, ) + nc.daemonSetStore.Store, nc.daemonSetController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -230,6 +253,12 @@ func NewNodeController( controller.NoResyncPeriodFunc(), framework.ResourceEventHandlerFuncs{}, ) + + if allocateNodeCIDRs { + // TODO(mqliang): make pod CIDR mask size configurable, for now set it to 24. + nc.cidrAllocator = NewCIDRRangeAllocator(clusterCIDR, 24) + } + return nc } @@ -305,17 +334,68 @@ func (nc *NodeController) Run(period time.Duration) { go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop) } -// Generates num pod CIDRs that could be assigned to nodes. -func generateCIDRs(clusterCIDR *net.IPNet, num int) sets.String { - res := sets.NewString() - cidrIP := clusterCIDR.IP.To4() - for i := 0; i < num; i++ { - // TODO: Make the CIDRs configurable. - b1 := byte(i >> 8) - b2 := byte(i % 256) - res.Insert(fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], cidrIP[1]+b1, cidrIP[2]+b2)) +// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR +// if it doesn't currently have one or mark the CIDR as used if the node already have one. +func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) { + node := obj.(*api.Node) + + if node.Spec.PodCIDR != "" { + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + return + } + if err := nc.cidrAllocator.Occupy(podCIDR); err != nil { + glog.Errorf("failed to mark cidr as occupied :%v", err) + return + } + return + } + + podCIDR, err := nc.cidrAllocator.AllocateNext() + if err != nil { + nc.recordNodeStatusChange(node, "CIDRNotAvailable") + return + } + + glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR) + for rep := 0; rep < podCIDRUpdateRetry; rep++ { + node.Spec.PodCIDR = podCIDR.String() + if _, err := nc.kubeClient.Core().Nodes().Update(node); err == nil { + glog.Errorf("Failed while updating Node.Spec.PodCIDR : %v", err) + break + } + node, err = nc.kubeClient.Core().Nodes().Get(node.Name) + if err != nil { + glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", node.Name, err) + break + } + } + if err != nil { + glog.Errorf("Update PodCIDR of node %v from NodeController exceeds retry count.", node.Name) + nc.recordNodeStatusChange(node, "CIDRAssignmentFailed") + glog.Errorf("CIDR assignment for node %v failed: %v", node.Name, err) + } +} + +// recycleCIDR recycles the CIDR of a removed node +func (nc *NodeController) recycleCIDR(obj interface{}) { + node := obj.(*api.Node) + + if node.Spec.PodCIDR == "" { + return + } + + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + return + } + + glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR) + if err := nc.cidrAllocator.Release(podCIDR); err != nil { + glog.Errorf("failed to release cidr: %v", err) } - return res } // getCondition returns a condition object for the specific condition @@ -450,11 +530,6 @@ func (nc *NodeController) monitorNodeStatus() error { } } - if nc.allocateNodeCIDRs { - // TODO (cjcullen): Use pkg/controller/framework to watch nodes and - // reduce lists/decouple this from monitoring status. - nc.reconcileNodeCIDRs(nodes) - } seenReady := false for i := range nodes.Items { var gracePeriod time.Duration @@ -590,42 +665,6 @@ func (nc *NodeController) forcefullyDeleteNode(nodeName string) error { return nil } -// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR -// if it doesn't currently have one. -func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) { - glog.V(4).Infof("Reconciling cidrs for %d nodes", len(nodes.Items)) - // TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs - // on each sync period? - availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items)) - for _, node := range nodes.Items { - if node.Spec.PodCIDR != "" { - glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name) - availableCIDRs.Delete(node.Spec.PodCIDR) - } - } - for _, node := range nodes.Items { - if node.Spec.PodCIDR == "" { - // Re-GET node (because ours might be stale by now). - n, err := nc.kubeClient.Core().Nodes().Get(node.Name) - if err != nil { - glog.Errorf("Failed to get node %q: %v", node.Name, err) - continue - } - podCIDR, found := availableCIDRs.PopAny() - if !found { - nc.recordNodeStatusChange(n, "CIDRNotAvailable") - continue - } - glog.V(1).Infof("Assigning node %s CIDR %s", n.Name, podCIDR) - n.Spec.PodCIDR = podCIDR - if _, err := nc.kubeClient.Core().Nodes().Update(n); err != nil { - nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed") - } - } - - } -} - func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event string) { ref := &api.ObjectReference{ Kind: "Node",