mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
List all nodes and occupy cidr map before starting allocations
This commit is contained in:
parent
51629f50b7
commit
2f9516db30
@ -238,10 +238,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
|
||||
}
|
||||
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
|
||||
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
|
||||
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
|
||||
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
|
||||
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
|
||||
}
|
||||
nodeController.Run(s.NodeSyncPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
|
@ -153,9 +153,12 @@ func (s *CMServer) Run(_ []string) error {
|
||||
}
|
||||
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
|
||||
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
|
||||
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
|
||||
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
|
||||
s.PodEvictionTimeout.Duration, s.DeletingPodsQps,
|
||||
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
|
||||
}
|
||||
nodeController.Run(s.NodeSyncPeriod.Duration)
|
||||
|
||||
nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod.Duration, time.Now)
|
||||
|
@ -64,7 +64,9 @@ type rangeAllocator struct {
|
||||
|
||||
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
|
||||
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
|
||||
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator {
|
||||
// Caller must always pass in a list of existing nodes so the new allocator
|
||||
// can initialize its CIDR map. NodeList is only nil in testing.
|
||||
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *api.NodeList) (CIDRAllocator, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
@ -82,6 +84,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
||||
} else {
|
||||
glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
|
||||
}
|
||||
|
||||
if nodeList != nil {
|
||||
for _, node := range nodeList.Items {
|
||||
if node.Spec.PodCIDR == "" {
|
||||
glog.Infof("Node %v has no CIDR, ignoring", node.Name)
|
||||
continue
|
||||
} else {
|
||||
glog.Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR)
|
||||
}
|
||||
if err := ra.occupyCIDR(&node); err != nil {
|
||||
// This will happen if:
|
||||
// 1. We find garbage in the podCIDR field. Retrying is useless.
|
||||
// 2. CIDR out of range: This means a node CIDR has changed.
|
||||
// This error will keep crashing controller-manager.
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
go func(stopChan <-chan struct{}) {
|
||||
for {
|
||||
@ -99,21 +119,28 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
||||
}(wait.NeverStop)
|
||||
}
|
||||
|
||||
return ra
|
||||
return ra, nil
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
|
||||
if node.Spec.PodCIDR == "" {
|
||||
return nil
|
||||
}
|
||||
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
|
||||
}
|
||||
if err := r.cidrs.occupy(podCIDR); err != nil {
|
||||
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
|
||||
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
|
||||
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
|
||||
if node.Spec.PodCIDR != "" {
|
||||
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
|
||||
}
|
||||
if err := r.cidrs.occupy(podCIDR); err != nil {
|
||||
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
|
||||
}
|
||||
return nil
|
||||
return r.occupyCIDR(node)
|
||||
}
|
||||
podCIDR, err := r.cidrs.allocateNext()
|
||||
if err != nil {
|
||||
|
@ -128,7 +128,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
|
||||
expectedAllocatedCIDR string
|
||||
allocatedCIDRs []string
|
||||
}) {
|
||||
allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize)
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
// this is a bit of white box testing
|
||||
for _, allocated := range tc.allocatedCIDRs {
|
||||
_, cidr, err := net.ParseCIDR(allocated)
|
||||
@ -209,7 +209,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
|
||||
subNetMaskSize int
|
||||
allocatedCIDRs []string
|
||||
}) {
|
||||
allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize)
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
// this is a bit of white box testing
|
||||
for _, allocated := range tc.allocatedCIDRs {
|
||||
_, cidr, err := net.ParseCIDR(allocated)
|
||||
@ -320,7 +320,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
|
||||
allocatedCIDRs []string
|
||||
cidrsToRelease []string
|
||||
}) {
|
||||
allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize)
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
// this is a bit of white box testing
|
||||
for _, allocated := range tc.allocatedCIDRs {
|
||||
_, cidr, err := net.ParseCIDR(allocated)
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
@ -58,6 +59,8 @@ const (
|
||||
nodeEvictionPeriod = 100 * time.Millisecond
|
||||
// Burst value for all eviction rate limiters
|
||||
evictionRateLimiterBurst = 1
|
||||
// The amount of time the nodecontroller polls on the list nodes endpoint.
|
||||
apiserverStartupGracePeriod = 10 * time.Minute
|
||||
)
|
||||
|
||||
type zoneState string
|
||||
@ -140,6 +143,9 @@ type NodeController struct {
|
||||
}
|
||||
|
||||
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
||||
// This method returns an error if it is unable to initialize the CIDR bitmap with
|
||||
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
|
||||
// currently, this should be handled as a fatal error.
|
||||
func NewNodeController(
|
||||
cloud cloudprovider.Interface,
|
||||
kubeClient clientset.Interface,
|
||||
@ -151,7 +157,7 @@ func NewNodeController(
|
||||
clusterCIDR *net.IPNet,
|
||||
serviceCIDR *net.IPNet,
|
||||
nodeCIDRMaskSize int,
|
||||
allocateNodeCIDRs bool) *NodeController {
|
||||
allocateNodeCIDRs bool) (*NodeController, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
@ -277,10 +283,30 @@ func NewNodeController(
|
||||
)
|
||||
|
||||
if allocateNodeCIDRs {
|
||||
nc.cidrAllocator = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
|
||||
var nodeList *api.NodeList
|
||||
var err error
|
||||
// We must poll because apiserver might not be up. This error causes
|
||||
// controller manager to restart.
|
||||
if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) {
|
||||
nodeList, err = kubeClient.Core().Nodes().List(api.ListOptions{
|
||||
FieldSelector: fields.Everything(),
|
||||
LabelSelector: labels.Everything(),
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list all nodes: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}); pollErr != nil {
|
||||
return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod)
|
||||
}
|
||||
nc.cidrAllocator, err = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return nc
|
||||
return nc, nil
|
||||
}
|
||||
|
||||
// Run starts an asynchronous loop that monitors the status of cluster nodes.
|
||||
|
@ -600,7 +600,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, item.fakeNodeHandler,
|
||||
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler,
|
||||
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
|
||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
|
||||
nodeController.now = func() unversioned.Time { return fakeNow }
|
||||
@ -673,7 +673,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
|
||||
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}),
|
||||
deleteWaitChan: make(chan struct{}),
|
||||
}
|
||||
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
|
||||
nodeController, _ := NewNodeController(nil, fnh, 10*time.Minute,
|
||||
testRateLimiterQPS,
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
|
||||
testNodeMonitorPeriod, nil, nil, 0, false)
|
||||
@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, item := range table {
|
||||
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
|
||||
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
|
||||
nodeController.now = func() unversioned.Time { return fakeNow }
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, item := range table {
|
||||
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
|
||||
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
|
||||
nodeController.now = func() unversioned.Time { return fakeNow }
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) {
|
||||
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
|
||||
}
|
||||
|
||||
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
|
||||
nodeController, _ := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
|
||||
testNodeMonitorPeriod, nil, nil, 0, false)
|
||||
nodeController.now = func() unversioned.Time { return fakeNow }
|
||||
@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
|
||||
nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
|
||||
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
nc.nodeStore.Store.Add(&api.Node{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
|
||||
newPod("b", "bar"),
|
||||
newPod("c", "gone"),
|
||||
}
|
||||
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
|
||||
nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
|
||||
|
||||
nc.nodeStore.Store.Add(newNode("foo"))
|
||||
nc.nodeStore.Store.Add(newNode("bar"))
|
||||
|
Loading…
Reference in New Issue
Block a user