Merge pull request #51374 from bowei/ip-alias-v2

Automatic merge from submit-queue (batch tested with PRs 51583, 51283, 51374, 51690, 51716)

Add IPAM controller for synchronizing node pod CIDR range allocations between the cluster and the cloud (alpha feature)

```release-note
IPAM controller unifies handling of node pod CIDR range allocation.
It is intended to supersede the logic that is currently in range_allocator 
and cloud_cidr_allocator. (ALPHA FEATURE)

Note: for this change, the other allocators still exist and are the default.

It supports two modes:
* CIDR range allocations done within the cluster that are then propagated out to the cloud provider.
* Cloud provider managed IPAM that is then reflected into the cluster.
```

Fixes https://github.com/kubernetes/kubernetes/issues/51826
This commit is contained in:
Kubernetes Submit Queue 2017-09-02 20:35:24 -07:00 committed by GitHub
commit dceff77669
22 changed files with 1681 additions and 238 deletions

View File

@ -174,7 +174,6 @@ pkg/controller/garbagecollector/metaonly
pkg/controller/job
pkg/controller/namespace
pkg/controller/namespace/deletion
pkg/controller/node
pkg/controller/podautoscaler
pkg/controller/podautoscaler/metrics
pkg/controller/podgc

View File

@ -51,6 +51,7 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/ipam/sync:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/metrics:go_default_library",
@ -65,7 +66,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -8,9 +8,15 @@ load(
go_test(
name = "go_default_test",
srcs = ["cidr_allocator_test.go"],
srcs = [
"controller_test.go",
"range_allocator_test.go",
"timeout_test.go",
],
library = ":go_default_library",
deps = [
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/ipam/test:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -22,28 +28,38 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"adapter.go",
"cidr_allocator.go",
"cloud_cidr_allocator.go",
"controller.go",
"doc.go",
"range_allocator.go",
"timeout.go",
],
deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/ipam/sync:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme:go_default_library",
],
)
@ -59,6 +75,8 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/controller/node/ipam/cidrset:all-srcs",
"//pkg/controller/node/ipam/sync:all-srcs",
"//pkg/controller/node/ipam/test:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,125 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"context"
"encoding/json"
"net"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme"
)
type adapter struct {
k8s clientset.Interface
cloud *gce.GCECloud
recorder record.EventRecorder
}
func newAdapter(k8s clientset.Interface, cloud *gce.GCECloud) *adapter {
ret := &adapter{
k8s: k8s,
cloud: cloud,
}
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
ret.recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"})
glog.V(0).Infof("Sending events to api server.")
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: v1core.New(k8s.Core().RESTClient()).Events(""),
})
return ret
}
func (a *adapter) Alias(ctx context.Context, nodeName string) (*net.IPNet, error) {
cidrs, err := a.cloud.AliasRanges(types.NodeName(nodeName))
if err != nil {
return nil, err
}
switch len(cidrs) {
case 0:
return nil, nil
case 1:
break
default:
glog.Warningf("Node %q has more than one alias assigned (%v), defaulting to the first", nodeName, cidrs)
}
_, cidrRange, err := net.ParseCIDR(cidrs[0])
if err != nil {
return nil, err
}
return cidrRange, nil
}
func (a *adapter) AddAlias(ctx context.Context, nodeName string, cidrRange *net.IPNet) error {
return a.cloud.AddAliasToInstance(types.NodeName(nodeName), cidrRange)
}
func (a *adapter) Node(ctx context.Context, name string) (*v1.Node, error) {
return a.k8s.Core().Nodes().Get(name, metav1.GetOptions{})
}
func (a *adapter) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
patch := map[string]interface{}{
"apiVersion": node.APIVersion,
"kind": node.Kind,
"metadata": map[string]interface{}{"name": node.Name},
"spec": map[string]interface{}{"podCIDR": cidrRange.String()},
}
bytes, err := json.Marshal(patch)
if err != nil {
return err
}
_, err = a.k8s.Core().Nodes().Patch(node.Name, types.StrategicMergePatchType, bytes)
return err
}
func (a *adapter) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error {
condition := v1.ConditionFalse
if unavailable {
condition = v1.ConditionTrue
}
return nodeutil.SetNodeCondition(a.k8s, types.NodeName(nodeName), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: condition,
Reason: "RouteCreated",
Message: "NodeController created an implicit route",
LastTransitionTime: metav1.Now(),
})
}
func (a *adapter) EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{}) {
ref := &v1.ObjectReference{Kind: "Node", Name: nodeName}
a.recorder.Eventf(ref, v1.EventTypeNormal, reason, fmt, args...)
}

View File

@ -17,9 +17,20 @@ limitations under the License.
package ipam
import (
"fmt"
"net"
"time"
v1 "k8s.io/api/core/v1"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider"
)
type nodeAndCIDR struct {
@ -37,10 +48,19 @@ const (
// CloudAllocatorType is the allocator that uses cloud platform
// support to do node CIDR range allocations.
CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
// IPAMFromClusterAllocatorType uses the ipam controller sync'ing the node
// CIDR range allocations from the cluster to the cloud.
IPAMFromClusterAllocatorType = "IPAMFromCluster"
// IPAMFromCloudAllocatorType uses the ipam controller sync'ing the node
// CIDR range allocations from the cloud to the cluster.
IPAMFromCloudAllocatorType = "IPAMFromCloud"
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
)
// CIDRAllocator is an interface implemented by things that know how to
// allocate/occupy/recycle CIDR for nodes.
// CIDRAllocator is an interface implemented by things that know how
// to allocate/occupy/recycle CIDR for nodes.
type CIDRAllocator interface {
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid
// CIDR if it doesn't currently have one or mark the CIDR as used if
@ -48,4 +68,45 @@ type CIDRAllocator interface {
AllocateOrOccupyCIDR(node *v1.Node) error
// ReleaseCIDR releases the CIDR of the removed node
ReleaseCIDR(node *v1.Node) error
// Register allocator with the nodeInformer for updates.
Register(nodeInformer informers.NodeInformer)
}
// New creates a new CIDR range allocator.
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) {
nodeList, err := listNodes(kubeClient)
if err != nil {
return nil, err
}
switch allocatorType {
case RangeAllocatorType:
return NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
case CloudAllocatorType:
return NewCloudCIDRAllocator(kubeClient, cloud)
default:
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", allocatorType)
}
}
func listNodes(kubeClient clientset.Interface) (*v1.NodeList, error) {
var nodeList *v1.NodeList
// We must poll because apiserver might not be up. This error causes
// controller manager to restart.
if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) {
var err error
nodeList, err = kubeClient.Core().Nodes().List(metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
glog.Errorf("Failed to list all nodes: %v", err)
return false, nil
}
return true, nil
}); pollErr != nil {
return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map",
apiserverStartupGracePeriod)
}
return nodeList, nil
}

View File

@ -102,7 +102,8 @@ func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet {
}
}
// AllocateNext allocates the next free CIDR range.
// AllocateNext allocates the next free CIDR range. This will set the range
// as occupied and return the allocated range.
func (s *CidrSet) AllocateNext() (*net.IPNet, error) {
s.Lock()
defer s.Unlock()
@ -186,7 +187,8 @@ func (s *CidrSet) Release(cidr *net.IPNet) error {
return nil
}
// Occupy marks the given CIDR range as used.
// Occupy marks the given CIDR range as used. Occupy does not check if the CIDR
// range was previously used.
func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
@ -203,21 +205,24 @@ func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) {
}
func (s *CidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) {
var cidrIndex uint32
if cidr.IP.To4() != nil {
cidrIndex = (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize)
if cidrIndex >= uint32(s.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr)
}
} else if cidr.IP.To16() != nil {
cidrIndex64 := (binary.BigEndian.Uint64(s.clusterIP) ^ binary.BigEndian.Uint64(cidr.IP.To16())) >> uint64(64-s.subNetMaskSize)
if cidrIndex64 >= uint64(s.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr)
}
cidrIndex = uint32(cidrIndex64)
} else {
return 0, fmt.Errorf("invalid CIDR block: %v", cidr)
}
return int(cidrIndex), nil
return s.getIndexForIP(cidr.IP)
}
func (s *CidrSet) getIndexForIP(ip net.IP) (int, error) {
if ip.To4() != nil {
cidrIndex := (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(ip.To4())) >> uint32(32-s.subNetMaskSize)
if cidrIndex >= uint32(s.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.subNetMaskSize)
}
return int(cidrIndex), nil
}
if ip.To16() != nil {
cidrIndex := (binary.BigEndian.Uint64(s.clusterIP) ^ binary.BigEndian.Uint64(ip.To16())) >> uint64(64-s.subNetMaskSize)
if cidrIndex >= uint64(s.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.subNetMaskSize)
}
return int(cidrIndex), nil
}
return 0, fmt.Errorf("invalid IP: %v", ip)
}

View File

@ -24,7 +24,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/api/core/v1"
@ -142,3 +143,16 @@ func (ca *cloudCIDRAllocator) ReleaseCIDR(node *v1.Node) error {
node.Name, node.Spec.PodCIDR)
return nil
}
func (ca *cloudCIDRAllocator) Register(nodeInformer informers.NodeInformer) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(newNode)
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
})
}

View File

@ -0,0 +1,210 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync"
"k8s.io/kubernetes/pkg/controller/node/util"
)
// Config for the IPAM controller.
type Config struct {
// Resync is the default timeout duration when there are no errors.
Resync time.Duration
// MaxBackoff is the maximum timeout when in a error backoff state.
MaxBackoff time.Duration
// InitialRetry is the initial retry interval when an error is reported.
InitialRetry time.Duration
// Mode to use to synchronize.
Mode nodesync.NodeSyncMode
}
// Controller is the controller for synchronizing cluster and cloud node
// pod CIDR range assignments.
type Controller struct {
config *Config
adapter *adapter
lock sync.Mutex
syncers map[string]*nodesync.NodeSync
set *cidrset.CidrSet
}
// NewController returns a new instance of the IPAM controller.
func NewController(
config *Config,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
clusterCIDR, serviceCIDR *net.IPNet,
nodeCIDRMaskSize int) (*Controller, error) {
if !nodesync.IsValidMode(config.Mode) {
return nil, fmt.Errorf("invalid IPAM controller mode %q", config.Mode)
}
gceCloud, ok := cloud.(*gce.GCECloud)
if !ok {
return nil, fmt.Errorf("cloud IPAM controller does not support %q provider", cloud.ProviderName())
}
c := &Controller{
config: config,
adapter: newAdapter(kubeClient, gceCloud),
syncers: make(map[string]*nodesync.NodeSync),
set: cidrset.NewCIDRSet(clusterCIDR, nodeCIDRMaskSize),
}
if err := occupyServiceCIDR(c.set, clusterCIDR, serviceCIDR); err != nil {
return nil, err
}
return c, nil
}
// Start initializes the Controller with the existing list of nodes and
// registers the informers for node chnages. This will start synchronization
// of the node and cloud CIDR range allocations.
func (c *Controller) Start(nodeInformer informers.NodeInformer) error {
glog.V(0).Infof("Starting IPAM controller (config=%+v)", c.config)
nodes, err := listNodes(c.adapter.k8s)
if err != nil {
return err
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
_, cidrRange, err := net.ParseCIDR(node.Spec.PodCIDR)
if err == nil {
c.set.Occupy(cidrRange)
glog.V(3).Infof("Occupying CIDR for node %q (%v)", node.Name, node.Spec.PodCIDR)
} else {
glog.Errorf("Node %q has an invalid CIDR (%q): %v", node.Name, node.Spec.PodCIDR, err)
}
}
func() {
c.lock.Lock()
defer c.lock.Unlock()
// XXX/bowei -- stagger the start of each sync cycle.
syncer := c.newSyncer(node.Name)
c.syncers[node.Name] = syncer
go syncer.Loop(nil)
}()
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(c.onAdd),
UpdateFunc: util.CreateUpdateNodeHandler(c.onUpdate),
DeleteFunc: util.CreateDeleteNodeHandler(c.onDelete),
})
return nil
}
// occupyServiceCIDR removes the service CIDR range from the cluster CIDR if it
// intersects.
func occupyServiceCIDR(set *cidrset.CidrSet, clusterCIDR, serviceCIDR *net.IPNet) error {
if clusterCIDR.Contains(serviceCIDR.IP) || serviceCIDR.Contains(clusterCIDR.IP) {
if err := set.Occupy(serviceCIDR); err != nil {
return err
}
}
return nil
}
type nodeState struct {
t Timeout
}
func (ns *nodeState) ReportResult(err error) {
ns.t.Update(err == nil)
}
func (ns *nodeState) ResyncTimeout() time.Duration {
return ns.t.Next()
}
func (c *Controller) newSyncer(name string) *nodesync.NodeSync {
ns := &nodeState{
Timeout{
Resync: c.config.Resync,
MaxBackoff: c.config.MaxBackoff,
InitialRetry: c.config.InitialRetry,
},
}
return nodesync.New(ns, c.adapter, c.adapter, c.config.Mode, name, c.set)
}
func (c *Controller) onAdd(node *v1.Node) error {
c.lock.Lock()
defer c.lock.Unlock()
if syncer, ok := c.syncers[node.Name]; !ok {
syncer = c.newSyncer(node.Name)
c.syncers[node.Name] = syncer
go syncer.Loop(nil)
} else {
glog.Warningf("Add for node %q that already exists", node.Name)
syncer.Update(node)
}
return nil
}
func (c *Controller) onUpdate(_, node *v1.Node) error {
c.lock.Lock()
defer c.lock.Unlock()
if sync, ok := c.syncers[node.Name]; ok {
sync.Update(node)
} else {
glog.Errorf("Received update for non-existant node %q", node.Name)
return fmt.Errorf("unknown node %q", node.Name)
}
return nil
}
func (c *Controller) onDelete(node *v1.Node) error {
c.lock.Lock()
defer c.lock.Unlock()
if syncer, ok := c.syncers[node.Name]; ok {
syncer.Delete(node)
delete(c.syncers, node.Name)
} else {
glog.Warning("Node %q was already deleted", node.Name)
}
return nil
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"net"
"testing"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/ipam/test"
)
func TestOccupyServiceCIDR(t *testing.T) {
const clusterCIDR = "10.1.0.0/16"
TestCase:
for _, tc := range []struct {
serviceCIDR string
}{
{"10.0.255.0/24"},
{"10.1.0.0/24"},
{"10.1.255.0/24"},
{"10.2.0.0/24"},
} {
serviceCIDR := test.MustParseCIDR(tc.serviceCIDR)
set := cidrset.NewCIDRSet(test.MustParseCIDR(clusterCIDR), 24)
if err := occupyServiceCIDR(set, test.MustParseCIDR(clusterCIDR), serviceCIDR); err != nil {
t.Errorf("test case %+v: occupyServiceCIDR() = %v, want nil", tc, err)
}
// Allocate until full.
var cidrs []*net.IPNet
for {
cidr, err := set.AllocateNext()
if err != nil {
if err == cidrset.ErrCIDRRangeNoCIDRsRemaining {
break
}
t.Errorf("set.AllocateNext() = %v, want %v", err, cidrset.ErrCIDRRangeNoCIDRsRemaining)
continue TestCase
}
cidrs = append(cidrs, cidr)
}
// No allocated CIDR range should intersect with serviceCIDR.
for _, c := range cidrs {
if c.Contains(serviceCIDR.IP) || serviceCIDR.Contains(c.IP) {
t.Errorf("test case %+v: allocated CIDR %v from service range", tc, c)
}
}
}
}

View File

@ -0,0 +1,30 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package ipam provides different allocators for assigning IP ranges to nodes.
// We currently support several kinds of IPAM allocators (these are denoted by
// the CIDRAllocatorType):
// - RangeAllocator is an allocator that assigns PodCIDRs to nodes and works
// in conjunction with the RouteController to configure the network to get
// connectivity.
// - CloudAllocator is an allocator that synchronizes PodCIDRs from IP
// ranges assignments from the underlying cloud platform.
// - (Alpha only) IPAMFromCluster is an allocator that has the similar
// functionality as the RangeAllocator but also synchronizes cluster-managed
// ranges into the cloud platform.
// - (Alpha only) IPAMFromCloud is the same as CloudAllocator (synchronizes
// from cloud into the cluster.)
package ipam

View File

@ -28,9 +28,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
@ -264,3 +266,35 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
return err
}
func (r *rangeAllocator) Register(nodeInformer informers.NodeInformer) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(r.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
// - already processed a Node successfully and allocated a CIDR for it
// (cidr is marked as used),
// - already processed a Node but we did saw a "timeout" response and
// request eventually got through in this case we haven't released
// the allocated CIDR (cidr is still marked as used).
// There's a possible error here:
// - NC sees a new Node and assigns a CIDR X to it,
// - Update Node call fails with a timeout,
// - Node is updated by some other component, NC sees an update and
// assigns CIDR Y to the Node,
// - Both CIDR X and CIDR Y are marked as used in the local cache,
// even though Node sees only CIDR Y
// The problem here is that in in-memory cache we see CIDR X as marked,
// which prevents it from being assigned to any new node. The cluster
// state is correct.
// Restart of NC fixes the issue.
if newNode.Spec.PodCIDR == "" {
return r.AllocateOrOccupyCIDR(newNode)
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(r.ReleaseCIDR),
})
}

View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["sync.go"],
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["sync_test.go"],
library = ":go_default_library",
deps = [
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/ipam/test:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,386 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"context"
"fmt"
"net"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
)
const (
// InvalidPodCIDR is the event recorded when a node is found with an
// invalid PodCIDR.
InvalidPodCIDR = "CloudCIDRAllocatorInvalidPodCIDR"
// InvalidModeEvent is the event recorded when the CIDR range cannot be
// sync'd due to the cluster running in the wrong mode.
InvalidModeEvent = "CloudCIDRAllocatorInvalidMode"
// MismatchEvent is the event recorded when the CIDR range allocated in the
// node spec does not match what has been allocated in the cloud.
MismatchEvent = "CloudCIDRAllocatorMismatch"
)
// cloudAlias is the interface to the cloud platform APIs.
type cloudAlias interface {
// Alias returns the IP alias for the node.
Alias(ctx context.Context, nodeName string) (*net.IPNet, error)
// AddAlias adds an alias to the node.
AddAlias(ctx context.Context, nodeName string, cidrRange *net.IPNet) error
}
// kubeAPI is the interface to the Kubernetes APIs.
type kubeAPI interface {
// Node returns the spec for the Node object.
Node(ctx context.Context, name string) (*v1.Node, error)
// UpdateNodePodCIDR updates the PodCIDR in the Node spec.
UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error
// UpdateNodeNetworkUnavailable updates the network unavailable status for the node.
UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error
// EmitNodeEvent emits an event for the given node.
EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{})
}
// controller is the interface to the controller.
type controller interface {
// ReportResult updates the controller with the result of the latest
// sync operation.
ReportResult(err error)
// ResyncTimeout returns the amount of time to wait before retrying
// a sync with a node.
ResyncTimeout() time.Duration
}
// NodeSyncMode is the mode the cloud CIDR allocator runs in.
type NodeSyncMode string
var (
// SyncFromCloud is the mode that synchronizes the IP allocation from the cloud
// platform to the node.
SyncFromCloud NodeSyncMode = "SyncFromCloud"
// SyncFromCluster is the mode that synchronizes the IP allocation determined
// by the k8s controller to the cloud provider.
SyncFromCluster NodeSyncMode = "SyncFromCluster"
)
// IsValidMode returns true if the given mode is valid.
func IsValidMode(m NodeSyncMode) bool {
switch m {
case SyncFromCloud:
case SyncFromCluster:
default:
return false
}
return true
}
// NodeSync synchronizes the state for a single node in the cluster.
type NodeSync struct {
c controller
cloudAlias cloudAlias
kubeAPI kubeAPI
mode NodeSyncMode
nodeName string
opChan chan syncOp
set *cidrset.CidrSet
}
// New returns a new syncer for a given node.
func New(c controller, cloudAlias cloudAlias, kubeAPI kubeAPI, mode NodeSyncMode, nodeName string, set *cidrset.CidrSet) *NodeSync {
return &NodeSync{
c: c,
cloudAlias: cloudAlias,
kubeAPI: kubeAPI,
mode: mode,
nodeName: nodeName,
opChan: make(chan syncOp, 1),
set: set,
}
}
// Loop runs the sync loop for a given node. done is an optional channel that
// is closed when the Loop() returns.
func (sync *NodeSync) Loop(done chan struct{}) {
glog.V(2).Infof("Starting sync loop for node %q", sync.nodeName)
defer func() {
if done != nil {
close(done)
}
}()
timeout := sync.c.ResyncTimeout()
delayTimer := time.NewTimer(timeout)
glog.V(4).Infof("Resync node %q in %v", sync.nodeName, timeout)
for {
select {
case op, more := <-sync.opChan:
if !more {
glog.V(2).Infof("Stopping sync loop")
return
}
sync.c.ReportResult(op.run(sync))
if !delayTimer.Stop() {
<-delayTimer.C
}
case <-delayTimer.C:
glog.V(4).Infof("Running resync for node %q", sync.nodeName)
sync.c.ReportResult((&updateOp{}).run(sync))
}
timeout := sync.c.ResyncTimeout()
delayTimer.Reset(timeout)
glog.V(4).Infof("Resync node %q in %v", sync.nodeName, timeout)
}
}
// Update causes an update operation on the given node. If node is nil, then
// the syncer will fetch the node spec from the API server before syncing.
//
// This method is safe to call from multiple goroutines.
func (sync *NodeSync) Update(node *v1.Node) {
sync.opChan <- &updateOp{node}
}
// Delete performs the sync operations necessary to remove the node from the
// IPAM state.
//
// This method is safe to call from multiple goroutines.
func (sync *NodeSync) Delete(node *v1.Node) {
sync.opChan <- &deleteOp{node}
close(sync.opChan)
}
// syncOp is the interface for generic sync operation.
type syncOp interface {
// run the requested sync operation.
run(sync *NodeSync) error
}
// updateOp handles creation and updates of a node.
type updateOp struct {
node *v1.Node
}
func (op *updateOp) String() string {
if op.node == nil {
return fmt.Sprintf("updateOp(nil)")
}
return fmt.Sprintf("updateOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR)
}
func (op *updateOp) run(sync *NodeSync) error {
glog.V(3).Infof("Running updateOp %+v", op)
ctx := context.Background()
if op.node == nil {
glog.V(3).Infof("Getting node spec for %q", sync.nodeName)
node, err := sync.kubeAPI.Node(ctx, sync.nodeName)
if err != nil {
glog.Errorf("Error getting node %q spec: %v", sync.nodeName, err)
return err
}
op.node = node
}
aliasRange, err := sync.cloudAlias.Alias(ctx, sync.nodeName)
if err != nil {
glog.Errorf("Error getting cloud alias for node %q: %v", sync.nodeName, err)
return err
}
switch {
case op.node.Spec.PodCIDR == "" && aliasRange == nil:
err = op.allocateRange(ctx, sync, op.node)
case op.node.Spec.PodCIDR == "" && aliasRange != nil:
err = op.updateNodeFromAlias(ctx, sync, op.node, aliasRange)
case op.node.Spec.PodCIDR != "" && aliasRange == nil:
err = op.updateAliasFromNode(ctx, sync, op.node)
case op.node.Spec.PodCIDR != "" && aliasRange != nil:
err = op.validateRange(ctx, sync, op.node, aliasRange)
}
return err
}
// validateRange checks that the allocated range and the alias range
// match.
func (op *updateOp) validateRange(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error {
if node.Spec.PodCIDR != aliasRange.String() {
glog.Errorf("Inconsistency detected between node PodCIDR and node alias (%v != %v)",
node.Spec.PodCIDR, aliasRange)
sync.kubeAPI.EmitNodeWarningEvent(node.Name, MismatchEvent,
"Node.Spec.PodCIDR != cloud alias (%v != %v)", node.Spec.PodCIDR, aliasRange)
// User intervention is required in this case, as this is most likely due
// to the user mucking around with their VM aliases on the side.
} else {
glog.V(4).Infof("Node %q CIDR range %v is matches cloud assignment", node.Name, node.Spec.PodCIDR)
}
return nil
}
// updateNodeFromAlias updates the the node from the cloud allocated
// alias.
func (op *updateOp) updateNodeFromAlias(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error {
if sync.mode != SyncFromCloud {
sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent,
"Cannot sync from cloud in mode %q", sync.mode)
return fmt.Errorf("cannot sync from cloud in mode %q", sync.mode)
}
glog.V(2).Infof("Updating node spec with alias range, node.PodCIDR = %v", aliasRange)
if err := sync.set.Occupy(aliasRange); err != nil {
glog.Errorf("Error occupying range %v for node %v", aliasRange, sync.nodeName)
return err
}
if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, aliasRange); err != nil {
glog.Errorf("Could not update node %q PodCIDR to %v: %v", node.Name, aliasRange, err)
return err
}
glog.V(2).Infof("Node %q PodCIDR set to %v", node.Name, aliasRange)
if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
glog.Errorf("Error setting route status for node %q: %v", node.Name, err)
return err
}
if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
glog.Errorf("Could not update node NetworkUnavailable status to false: %v", err)
return err
}
glog.V(2).Infof("Updated node %q PodCIDR from cloud alias %v", node.Name, aliasRange)
return nil
}
// updateAliasFromNode updates the cloud alias given the node allocation.
func (op *updateOp) updateAliasFromNode(ctx context.Context, sync *NodeSync, node *v1.Node) error {
if sync.mode != SyncFromCluster {
sync.kubeAPI.EmitNodeWarningEvent(
node.Name, InvalidModeEvent, "Cannot sync to cloud in mode %q", sync.mode)
return fmt.Errorf("cannot sync to cloud in mode %q", sync.mode)
}
_, aliasRange, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
glog.Errorf("Could not parse PodCIDR (%q) for node %q: %v",
node.Spec.PodCIDR, node.Name, err)
return err
}
if err := sync.set.Occupy(aliasRange); err != nil {
glog.Errorf("Error occupying range %v for node %v", aliasRange, sync.nodeName)
return err
}
if err := sync.cloudAlias.AddAlias(ctx, node.Name, aliasRange); err != nil {
glog.Errorf("Could not add alias %v for node %q: %v", aliasRange, node.Name, err)
return err
}
if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
glog.Errorf("Could not update node NetworkUnavailable status to false: %v", err)
return err
}
glog.V(2).Infof("Updated node %q cloud alias with node spec, node.PodCIDR = %v",
node.Name, node.Spec.PodCIDR)
return nil
}
// allocateRange allocates a new range and updates both the cloud
// platform and the node allocation.
func (op *updateOp) allocateRange(ctx context.Context, sync *NodeSync, node *v1.Node) error {
if sync.mode != SyncFromCluster {
sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent,
"Cannot allocate CIDRs in mode %q", sync.mode)
return fmt.Errorf("controller cannot allocate CIDRS in mode %q", sync.mode)
}
cidrRange, err := sync.set.AllocateNext()
if err != nil {
return err
}
// If addAlias returns a hard error, cidrRange will be leaked as there
// is no durable record of the range. The missing space will be
// recovered on the next restart of the controller.
if err := sync.cloudAlias.AddAlias(ctx, node.Name, cidrRange); err != nil {
glog.Errorf("Could not add alias %v for node %q: %v", cidrRange, node.Name, err)
return err
}
if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, cidrRange); err != nil {
glog.Errorf("Could not update node %q PodCIDR to %v: %v", node.Name, cidrRange, err)
return err
}
if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
glog.Errorf("Could not update node NetworkUnavailable status to false: %v", err)
return err
}
glog.V(2).Infof("Allocated PodCIDR %v for node %q", cidrRange, node.Name)
return nil
}
// deleteOp handles deletion of a node.
type deleteOp struct {
node *v1.Node
}
func (op *deleteOp) String() string {
if op.node == nil {
return fmt.Sprintf("deleteOp(nil)")
}
return fmt.Sprintf("deleteOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR)
}
func (op *deleteOp) run(sync *NodeSync) error {
glog.V(3).Infof("Running deleteOp %+v", op)
if op.node.Spec.PodCIDR == "" {
glog.V(2).Infof("Node %q was deleted, node had no PodCIDR range assigned", op.node.Name)
return nil
}
_, cidrRange, err := net.ParseCIDR(op.node.Spec.PodCIDR)
if err != nil {
glog.Errorf("Deleted node %q has an invalid podCIDR %q: %v",
op.node.Name, op.node.Spec.PodCIDR, err)
sync.kubeAPI.EmitNodeWarningEvent(op.node.Name, InvalidPodCIDR,
"Node %q has an invalid PodCIDR: %q", op.node.Name, op.node.Spec.PodCIDR)
return nil
}
sync.set.Release(cidrRange)
glog.V(2).Infof("Node %q was deleted, releasing CIDR range %v",
op.node.Name, op.node.Spec.PodCIDR)
return nil
}

View File

@ -0,0 +1,294 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"context"
"fmt"
"net"
"reflect"
"testing"
"time"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/ipam/test"
"k8s.io/api/core/v1"
)
var (
_, clusterCIDRRange, _ = net.ParseCIDR("10.1.0.0/16")
)
type fakeEvent struct {
nodeName string
reason string
}
type fakeAPIs struct {
aliasRange *net.IPNet
aliasErr error
addAliasErr error
nodeRet *v1.Node
nodeErr error
updateNodeErr error
resyncTimeout time.Duration
reportChan chan struct{}
updateNodeNetworkUnavailableErr error
calls []string
events []fakeEvent
results []error
}
func (f *fakeAPIs) Alias(ctx context.Context, nodeName string) (*net.IPNet, error) {
f.calls = append(f.calls, fmt.Sprintf("alias %v", nodeName))
return f.aliasRange, f.aliasErr
}
func (f *fakeAPIs) AddAlias(ctx context.Context, nodeName string, cidrRange *net.IPNet) error {
f.calls = append(f.calls, fmt.Sprintf("addAlias %v %v", nodeName, cidrRange))
return f.addAliasErr
}
func (f *fakeAPIs) Node(ctx context.Context, name string) (*v1.Node, error) {
f.calls = append(f.calls, fmt.Sprintf("node %v", name))
return f.nodeRet, f.nodeErr
}
func (f *fakeAPIs) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
f.calls = append(f.calls, fmt.Sprintf("updateNode %v", node))
return f.updateNodeErr
}
func (f *fakeAPIs) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error {
f.calls = append(f.calls, fmt.Sprintf("updateNodeNetworkUnavailable %v %v", nodeName, unavailable))
return f.updateNodeNetworkUnavailableErr
}
func (f *fakeAPIs) EmitNodeWarningEvent(nodeName, reason, fmtStr string, args ...interface{}) {
f.events = append(f.events, fakeEvent{nodeName, reason})
}
func (f *fakeAPIs) ReportResult(err error) {
glog.V(2).Infof("ReportResult %v", err)
f.results = append(f.results, err)
if f.reportChan != nil {
f.reportChan <- struct{}{}
}
}
func (f *fakeAPIs) ResyncTimeout() time.Duration {
if f.resyncTimeout == 0 {
return time.Second * 10000
}
return f.resyncTimeout
}
func (f *fakeAPIs) dumpTrace() {
for i, x := range f.calls {
glog.Infof("trace %v: %v", i, x)
}
}
var nodeWithoutCIDRRange = &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
}
var nodeWithCIDRRange = &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
Spec: v1.NodeSpec{PodCIDR: "10.1.1.0/24"},
}
func TestNodeSyncUpdate(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
desc string
mode NodeSyncMode
node *v1.Node
fake fakeAPIs
events []fakeEvent
wantError bool
}{
{
desc: "validate range ==",
mode: SyncFromCloud,
node: nodeWithCIDRRange,
fake: fakeAPIs{
aliasRange: test.MustParseCIDR(nodeWithCIDRRange.Spec.PodCIDR),
},
},
{
desc: "validate range !=",
mode: SyncFromCloud,
node: nodeWithCIDRRange,
fake: fakeAPIs{aliasRange: test.MustParseCIDR("192.168.0.0/24")},
events: []fakeEvent{{"node1", "CloudCIDRAllocatorMismatch"}},
},
{
desc: "update alias from node",
mode: SyncFromCloud,
node: nodeWithCIDRRange,
events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}},
wantError: true,
},
{
desc: "update alias from node",
mode: SyncFromCluster,
node: nodeWithCIDRRange,
// XXX/bowei -- validation
},
{
desc: "update node from alias",
mode: SyncFromCloud,
node: nodeWithoutCIDRRange,
fake: fakeAPIs{aliasRange: test.MustParseCIDR("10.1.2.3/16")},
// XXX/bowei -- validation
},
{
desc: "update node from alias",
mode: SyncFromCluster,
node: nodeWithoutCIDRRange,
fake: fakeAPIs{aliasRange: test.MustParseCIDR("10.1.2.3/16")},
events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}},
wantError: true,
},
{
desc: "allocate range",
mode: SyncFromCloud,
node: nodeWithoutCIDRRange,
events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}},
wantError: true,
},
{
desc: "allocate range",
mode: SyncFromCluster,
node: nodeWithoutCIDRRange,
},
{
desc: "update with node==nil",
mode: SyncFromCluster,
node: nil,
fake: fakeAPIs{
nodeRet: nodeWithCIDRRange,
},
wantError: false,
},
} {
sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidrset.NewCIDRSet(clusterCIDRRange, 24))
doneChan := make(chan struct{})
// Do a single step of the loop.
go sync.Loop(doneChan)
sync.Update(tc.node)
close(sync.opChan)
<-doneChan
tc.fake.dumpTrace()
if !reflect.DeepEqual(tc.fake.events, tc.events) {
t.Errorf("%v, %v; fake.events = %#v, want %#v", tc.desc, tc.mode, tc.fake.events, tc.events)
}
var hasError bool
for _, r := range tc.fake.results {
hasError = hasError || (r != nil)
}
if hasError != tc.wantError {
t.Errorf("%v, %v; hasError = %t, errors = %v, want %t",
tc.desc, tc.mode, hasError, tc.fake.events, tc.wantError)
}
}
}
func TestNodeSyncResync(t *testing.T) {
fake := &fakeAPIs{
nodeRet: nodeWithCIDRRange,
resyncTimeout: time.Millisecond,
reportChan: make(chan struct{}),
}
sync := New(fake, fake, fake, SyncFromCluster, "node1", cidrset.NewCIDRSet(clusterCIDRRange, 24))
doneChan := make(chan struct{})
go sync.Loop(doneChan)
<-fake.reportChan
close(sync.opChan)
// Unblock loop().
go func() {
<-fake.reportChan
}()
<-doneChan
fake.dumpTrace()
}
func TestNodeSyncDelete(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
desc string
mode NodeSyncMode
node *v1.Node
fake fakeAPIs
}{
{
desc: "delete",
mode: SyncFromCluster,
node: nodeWithCIDRRange,
},
{
desc: "delete without CIDR range",
mode: SyncFromCluster,
node: nodeWithoutCIDRRange,
},
{
desc: "delete with invalid CIDR range",
mode: SyncFromCluster,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
Spec: v1.NodeSpec{PodCIDR: "invalid"},
},
},
} {
sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidrset.NewCIDRSet(clusterCIDRRange, 24))
doneChan := make(chan struct{})
// Do a single step of the loop.
go sync.Loop(doneChan)
sync.Delete(tc.node)
<-doneChan
tc.fake.dumpTrace()
/*
if !reflect.DeepEqual(tc.fake.events, tc.events) {
t.Errorf("%v, %v; fake.events = %#v, want %#v", tc.desc, tc.mode, tc.fake.events, tc.events)
}
var hasError bool
for _, r := range tc.fake.results {
hasError = hasError || (r != nil)
}
if hasError != tc.wantError {
t.Errorf("%v, %v; hasError = %t, errors = %v, want %t",
tc.desc, tc.mode, hasError, tc.fake.events, tc.wantError)
}
*/
}
}

View File

@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["utils.go"],
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,31 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package test
import (
"net"
)
// MustParseCIDR returns the CIDR range parsed from s or panics if the string
// cannot be parsed.
func MustParseCIDR(s string) *net.IPNet {
_, ret, err := net.ParseCIDR(s)
if err != nil {
panic(err)
}
return ret
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"time"
)
// Timeout manages the resync loop timing for a given node sync operation. The
// timeout changes depending on whether or not there was an error reported for
// the operation. Consecutive errors will result in exponential backoff to a
// maxBackoff timeout.
type Timeout struct {
// Resync is the default timeout duration when there are no errors.
Resync time.Duration
// MaxBackoff is the maximum timeout when in a error backoff state.
MaxBackoff time.Duration
// InitialRetry is the initial retry interval when an error is reported.
InitialRetry time.Duration
// errs is the count of consecutive errors that have occurred.
errs int
// current is the current backoff timeout.
current time.Duration
}
// Update the timeout with the current error state.
func (b *Timeout) Update(ok bool) {
if ok {
b.errs = 0
b.current = b.Resync
return
}
b.errs++
if b.errs == 1 {
b.current = b.InitialRetry
return
}
b.current *= 2
if b.current >= b.MaxBackoff {
b.current = b.MaxBackoff
}
}
// Next returns the next operation timeout given the disposition of err.
func (b *Timeout) Next() time.Duration {
if b.errs == 0 {
return b.Resync
}
return b.current
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"errors"
"testing"
"time"
)
func TestTimeout(t *testing.T) {
time10s := 10 * time.Second
time5s := 5 * time.Second
timeout := &Timeout{
Resync: time10s,
MaxBackoff: time5s,
InitialRetry: time.Second,
}
for _, testStep := range []struct {
err error
want time.Duration
}{
{nil, time10s},
{nil, time10s},
{errors.New("x"), time.Second},
{errors.New("x"), 2 * time.Second},
{errors.New("x"), 4 * time.Second},
{errors.New("x"), 5 * time.Second},
{errors.New("x"), 5 * time.Second},
{nil, time10s},
{nil, time10s},
{errors.New("x"), time.Second},
{errors.New("x"), 2 * time.Second},
{nil, time10s},
} {
timeout.Update(testStep.err == nil)
next := timeout.Next()
if next != testStep.want {
t.Errorf("timeout.next(%v) = %v, want %v", testStep.err, next, testStep.want)
}
}
}

View File

@ -23,42 +23,42 @@ import (
)
const (
NodeControllerSubsystem = "node_collector"
ZoneHealthStatisticKey = "zone_health"
ZoneSizeKey = "zone_size"
ZoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone"
EvictionsNumberKey = "evictions_number"
nodeControllerSubsystem = "node_collector"
zoneHealthStatisticKey = "zone_health"
zoneSizeKey = "zone_size"
zoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone"
evictionsNumberKey = "evictions_number"
)
var (
ZoneHealth = prometheus.NewGaugeVec(
zoneHealth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: NodeControllerSubsystem,
Name: ZoneHealthStatisticKey,
Subsystem: nodeControllerSubsystem,
Name: zoneHealthStatisticKey,
Help: "Gauge measuring percentage of healthy nodes per zone.",
},
[]string{"zone"},
)
ZoneSize = prometheus.NewGaugeVec(
zoneSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: NodeControllerSubsystem,
Name: ZoneSizeKey,
Subsystem: nodeControllerSubsystem,
Name: zoneSizeKey,
Help: "Gauge measuring number of registered Nodes per zones.",
},
[]string{"zone"},
)
UnhealthyNodes = prometheus.NewGaugeVec(
unhealthyNodes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: NodeControllerSubsystem,
Name: ZoneNoUnhealthyNodesKey,
Subsystem: nodeControllerSubsystem,
Name: zoneNoUnhealthyNodesKey,
Help: "Gauge measuring number of not Ready Nodes per zones.",
},
[]string{"zone"},
)
EvictionsNumber = prometheus.NewCounterVec(
evictionsNumber = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: NodeControllerSubsystem,
Name: EvictionsNumberKey,
Subsystem: nodeControllerSubsystem,
Name: evictionsNumberKey,
Help: "Number of Node evictions that happened since current instance of NodeController started.",
},
[]string{"zone"},
@ -67,11 +67,12 @@ var (
var registerMetrics sync.Once
// Register the metrics that are to be monitored.
func Register() {
registerMetrics.Do(func() {
prometheus.MustRegister(ZoneHealth)
prometheus.MustRegister(ZoneSize)
prometheus.MustRegister(UnhealthyNodes)
prometheus.MustRegister(EvictionsNumber)
prometheus.MustRegister(zoneHealth)
prometheus.MustRegister(zoneSize)
prometheus.MustRegister(unhealthyNodes)
prometheus.MustRegister(evictionsNumber)
})
}

View File

@ -27,7 +27,6 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -49,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/util/metrics"
@ -66,12 +66,13 @@ func init() {
var (
gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0")
// UnreachableTaintTemplate is the taint for when a node becomes unreachable.
UnreachableTaintTemplate = &v1.Taint{
Key: algorithm.TaintNodeUnreachable,
Effect: v1.TaintEffectNoExecute,
}
// NotReadyTaintTemplate is the taint for when a node is not ready for
// executing pods
NotReadyTaintTemplate = &v1.Taint{
Key: algorithm.TaintNodeNotReady,
Effect: v1.TaintEffectNoExecute,
@ -97,15 +98,26 @@ const (
apiserverStartupGracePeriod = 10 * time.Minute
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
retrySleepTime = 20 * time.Millisecond
// ipamResyncInterval is the amount of time between when the cloud and node
// CIDR range assignments are synchronized.
ipamResyncInterval = 30 * time.Second
// ipamMaxBackoff is the maximum backoff for retrying synchronization of a
// given in the error state.
ipamMaxBackoff = 10 * time.Second
// ipamInitialRetry is the initial retry interval for retrying synchronization of a
// given in the error state.
ipamInitialBackoff = 250 * time.Millisecond
)
type zoneState string
// ZoneState is the state of a given zone.
type ZoneState string
const (
stateInitial = zoneState("Initial")
stateNormal = zoneState("Normal")
stateFullDisruption = zoneState("FullDisruption")
statePartialDisruption = zoneState("PartialDisruption")
stateInitial = ZoneState("Initial")
stateNormal = ZoneState("Normal")
stateFullDisruption = ZoneState("FullDisruption")
statePartialDisruption = ZoneState("PartialDisruption")
)
type nodeStatusData struct {
@ -114,7 +126,8 @@ type nodeStatusData struct {
status v1.NodeStatus
}
type NodeController struct {
// Controller is the controller that manages node related cluster state.
type Controller struct {
allocateNodeCIDRs bool
allocatorType ipam.CIDRAllocatorType
@ -125,10 +138,10 @@ type NodeController struct {
kubeClient clientset.Interface
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
// Value used if sync_nodes_status=False. NodeController will not proactively
// Value used if sync_nodes_status=False. Controller will not proactively
// sync node status in this case, but will monitor node status updated from kubelet. If
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
// ConditionUnknown". The amount of time before which NodeController start evicting pods
// ConditionUnknown". The amount of time before which Controller start evicting pods
// is controlled via flag 'pod-eviction-timeout'.
// Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
// in kubelet. There are several constraints:
@ -140,7 +153,7 @@ type NodeController struct {
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
// longer for user to see up-to-date node status.
nodeMonitorGracePeriod time.Duration
// Value controlling NodeController monitoring period, i.e. how often does NodeController
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod time.Duration
@ -170,28 +183,26 @@ type NodeController struct {
daemonSetInformerSynced cache.InformerSynced
podInformerSynced cache.InformerSynced
cidrAllocator ipam.CIDRAllocator
taintManager *scheduler.NoExecuteTaintManager
cidrAllocator ipam.CIDRAllocator
taintManager *scheduler.NoExecuteTaintManager
forcefullyDeletePod func(*v1.Pod) error
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, zoneState)
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
zoneStates map[string]zoneState
zoneStates map[string]ZoneState
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// if set to true NodeController will start TaintManager that will evict Pods from
// if set to true Controller will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManager bool
// if set to true NodeController will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
@ -225,17 +236,21 @@ func NewNodeController(
allocatorType ipam.CIDRAllocatorType,
runTaintManager bool,
useTaintBasedEvictions bool,
taintNodeByCondition bool,
) (*NodeController, error) {
taintNodeByCondition bool) (*Controller, error) {
if kubeClient == nil {
glog.Fatalf("kubeClient is nil when starting Controller")
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
if kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
} else {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(kubeClient.Core().RESTClient()).Events(""),
})
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().RESTClient().GetRateLimiter())
@ -243,45 +258,49 @@ func NewNodeController(
if allocateNodeCIDRs {
if clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
mask := clusterCIDR.Mask
if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize {
glog.Fatal("NodeController: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
glog.Fatal("Controller: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
}
}
nc := &NodeController{
cloud: cloud,
knownNodeSet: make(map[string]*v1.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: metav1.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
allocatorType: allocatorType,
forcefullyDeletePod: func(p *v1.Pod) error { return util.ForcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return util.NodeExistsInCloudProvider(cloud, nodeName) },
nc := &Controller{
cloud: cloud,
knownNodeSet: make(map[string]*v1.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: metav1.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
allocatorType: allocatorType,
forcefullyDeletePod: func(p *v1.Pod) error {
return util.ForcefullyDeletePod(kubeClient, p)
},
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) {
return util.NodeExistsInCloudProvider(cloud, nodeName)
},
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
zoneStates: make(map[string]ZoneState),
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
}
if useTaintBasedEvictions {
glog.Infof("NodeController is using taint based evictions.")
glog.Infof("Controller is using taint based evictions.")
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
@ -326,67 +345,34 @@ func NewNodeController(
nc.podInformerSynced = podInformer.Informer().HasSynced
if nc.allocateNodeCIDRs {
var nodeList *v1.NodeList
var err error
// We must poll because apiserver might not be up. This error causes
// controller manager to restart.
if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) {
nodeList, err = kubeClient.Core().Nodes().List(metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
glog.Errorf("Failed to list all nodes: %v", err)
return false, nil
if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
cfg := &ipam.Config{
Resync: ipamResyncInterval,
MaxBackoff: ipamMaxBackoff,
InitialRetry: ipamInitialBackoff,
}
return true, nil
}); pollErr != nil {
return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod)
switch nc.allocatorType {
case ipam.IPAMFromClusterAllocatorType:
cfg.Mode = nodesync.SyncFromCluster
case ipam.IPAMFromCloudAllocatorType:
cfg.Mode = nodesync.SyncFromCloud
}
ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
if err != nil {
glog.Fatalf("Error creating ipam controller: %v", err)
}
if err := ipamc.Start(nodeInformer); err != nil {
glog.Fatalf("Error trying to Init(): %v", err)
}
} else {
var err error
nc.cidrAllocator, err = ipam.New(
kubeClient, cloud, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
if err != nil {
return nil, err
}
nc.cidrAllocator.Register(nodeInformer)
}
switch nc.allocatorType {
case ipam.RangeAllocatorType:
nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator(
kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
case ipam.CloudAllocatorType:
nc.cidrAllocator, err = ipam.NewCloudCIDRAllocator(kubeClient, cloud)
default:
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", nc.allocatorType)
}
if err != nil {
return nil, err
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
// - already processed a Node successfully and allocated a CIDR for it
// (cidr is marked as used),
// - already processed a Node but we did saw a "timeout" response and
// request eventually got through in this case we haven't released
// the allocated CIDR (cidr is still marked as used).
// There's a possible error here:
// - NC sees a new Node and assigns a CIDR X to it,
// - Update Node call fails with a timeout,
// - Node is updated by some other component, NC sees an update and
// assigns CIDR Y to the Node,
// - Both CIDR X and CIDR Y are marked as used in the local cache,
// even though Node sees only CIDR Y
// The problem here is that in in-memory cache we see CIDR X as marked,
// which prevents it from being assigned to any new node. The cluster
// state is correct.
// Restart of NC fixes the issue.
if newNode.Spec.PodCIDR == "" {
return nc.cidrAllocator.AllocateOrOccupyCIDR(newNode)
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
})
}
if nc.runTaintManager {
@ -427,7 +413,7 @@ func NewNodeController(
return nc, nil
}
func (nc *NodeController) doEvictionPass() {
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
@ -440,23 +426,23 @@ func (nc *NodeController) doEvictionPass() {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
evictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
nodeUID, _ := value.UID.(string)
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to NodeController eviction")
glog.Infof("Pods awaiting deletion due to Controller eviction")
}
return true, 0
})
}
}
func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error {
func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
// Map node's condition to Taints.
taints := []v1.Taint{}
for _, condition := range node.Status.Conditions {
@ -484,7 +470,7 @@ func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error {
return nil
}
func (nc *NodeController) doNoExecuteTaintingPass() {
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainer {
@ -500,7 +486,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
return false, 50 * time.Millisecond
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
evictionsNumber.WithLabelValues(zone).Inc()
}
_, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
@ -524,7 +510,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(stopCh <-chan struct{}) {
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting node controller")
@ -560,7 +546,7 @@ func (nc *NodeController) Run(stopCh <-chan struct{}) {
}
// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) {
func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
zone := utilnode.GetZoneKey(node)
if _, found := nc.zoneStates[zone]; !found {
nc.zoneStates[zone] = stateInitial
@ -575,14 +561,14 @@ func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) {
}
// Init the metric for the new zone.
glog.Infof("Initializing eviction metric for zone: %v", zone)
EvictionsNumber.WithLabelValues(zone).Add(0)
evictionsNumber.WithLabelValues(zone).Add(0)
}
}
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {
func (nc *Controller) monitorNodeStatus() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
@ -596,8 +582,8 @@ func (nc *NodeController) monitorNodeStatus() error {
}
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
glog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
@ -608,8 +594,8 @@ func (nc *NodeController) monitorNodeStatus() error {
}
for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
glog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
@ -632,7 +618,7 @@ func (nc *NodeController) monitorNodeStatus() error {
}
return false, nil
}); err != nil {
glog.Errorf("Update status of Node %v from NodeController error : %v. "+
glog.Errorf("Update status of Node %v from Controller error : %v. "+
"Skipping - no pods will be evicted.", node.Name, err)
continue
}
@ -752,14 +738,14 @@ func (nc *NodeController) monitorNodeStatus() error {
return nil
}
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]zoneState{}
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]ZoneState{}
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
ZoneSize.WithLabelValues(k).Set(float64(len(v)))
zoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
@ -773,9 +759,9 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
ZoneSize.WithLabelValues(k).Set(0)
ZoneHealth.WithLabelValues(k).Set(100)
UnhealthyNodes.WithLabelValues(k).Set(0)
zoneSize.WithLabelValues(k).Set(0)
zoneHealth.WithLabelValues(k).Set(100)
unhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
@ -793,7 +779,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// We're switching to full disruption mode
if allAreFullyDisrupted {
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsReachable(nodes[i])
@ -820,7 +806,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
}
// We're exiting full disruption mode
if allWasFullyDisrupted {
glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes {
@ -843,14 +829,14 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
if v == newState {
continue
}
glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState)
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
}
}
}
func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
switch state {
case stateNormal:
if nc.useTaintBasedEvictions {
@ -879,7 +865,7 @@ func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zone
// tryUpdateNodeStatus checks a given node's conditions and tries to update it. Returns grace period to
// which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
var err error
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
@ -909,7 +895,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
savedNodeStatus, found := nc.nodeStatusMap[node.Name]
// There are following cases to check:
// - both saved and new status have no Ready Condition set - we leave everything as it is,
// - saved status have no Ready Condition, but current one does - NodeController was restarted with Node data already present in etcd,
// - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
// - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
// - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
// unresponsive, so we leave it as it is,
@ -1036,14 +1022,13 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil {
glog.Errorf("Error updating node %s: %v", node.Name, err)
return gracePeriod, observedReadyCondition, currentReadyCondition, err
} else {
nc.nodeStatusMap[node.Name] = nodeStatusData{
status: node.Status,
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(),
}
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
nc.nodeStatusMap[node.Name] = nodeStatusData{
status: node.Status,
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(),
}
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
}
@ -1054,7 +1039,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
// 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet'
// 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes'
// 3. newZoneRepresentatives: the nodes that in both 'knownNodeSet' and 'allNodes', but no zone states
func (nc *NodeController) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
for i := range allNodes {
if _, has := nc.knownNodeSet[allNodes[i].Name]; !has {
added = append(added, allNodes[i])
@ -1086,7 +1071,7 @@ func (nc *NodeController) classifyNodes(allNodes []*v1.Node) (added, deleted, ne
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(node *v1.Node) bool {
func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
zone := utilnode.GetZoneKey(node)
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
@ -1100,19 +1085,19 @@ func (nc *NodeController) cancelPodEviction(node *v1.Node) bool {
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(node *v1.Node) bool {
func (nc *Controller) evictPods(node *v1.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *NodeController) markNodeForTainting(node *v1.Node) bool {
func (nc *Controller) markNodeForTainting(node *v1.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) {
func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, UnreachableTaintTemplate)
@ -1128,13 +1113,15 @@ func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) {
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil
}
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
// HealthyQPSFunc returns the default value for cluster eviction rate - we take
// nodeNum for consistency with ReducedQPSFunc.
func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS
}
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
// ReducedQPSFunc returns the QPS for when a the cluster is large make
// evictions slower, if they're small stop evictions altogether.
func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
if int32(nodeNum) > nc.largeClusterThreshold {
return nc.secondaryEvictionLimiterQPS
}
@ -1146,7 +1133,7 @@ func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
// - fullyDisrupted if there're no Ready Nodes,
// - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
// - normal otherwise
func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, zoneState) {
func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
readyNodes := 0
notReadyNodes := 0
for i := range nodeReadyConditions {
@ -1168,7 +1155,7 @@ func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeConditi
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
func (nc *Controller) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)

View File

@ -61,12 +61,12 @@ const (
func alwaysReady() bool { return true }
type nodeController struct {
*NodeController
*Controller
nodeInformer coreinformers.NodeInformer
daemonSetInformer extensionsinformers.DaemonSetInformer
}
func NewNodeControllerFromClient(
func newNodeControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
@ -597,7 +597,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
for _, item := range table {
nodeController, _ := NewNodeControllerFromClient(
nodeController, _ := newNodeControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
@ -643,8 +643,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, zone := range zones {
if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
nodeUID, _ := value.UID.(string)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
@ -763,7 +763,7 @@ func TestPodStatusChange(t *testing.T) {
}
for _, item := range table {
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler,
nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController.now = func() metav1.Time { return fakeNow }
@ -788,8 +788,8 @@ func TestPodStatusChange(t *testing.T) {
zones := testutil.GetZones(item.fakeNodeHandler)
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
nodeUID, _ := value.UID.(string)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
return true, 0
})
}
@ -846,8 +846,8 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
nodeList []*v1.Node
podList []v1.Pod
updatedNodeStatuses []v1.NodeStatus
expectedInitialStates map[string]zoneState
expectedFollowingStates map[string]zoneState
expectedInitialStates map[string]ZoneState
expectedFollowingStates map[string]ZoneState
expectedEvictPods bool
description string
}{
@ -901,8 +901,8 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
unhealthyNodeNewStatus,
unhealthyNodeNewStatus,
},
expectedInitialStates: map[string]zoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption},
expectedFollowingStates: map[string]zoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption},
expectedInitialStates: map[string]ZoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption},
expectedFollowingStates: map[string]ZoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption},
expectedEvictPods: false,
description: "Network Disruption: Only zone is down - eviction shouldn't take place.",
},
@ -957,11 +957,11 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
unhealthyNodeNewStatus,
unhealthyNodeNewStatus,
},
expectedInitialStates: map[string]zoneState{
expectedInitialStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
testutil.CreateZoneID("region2", "zone2"): stateFullDisruption,
},
expectedFollowingStates: map[string]zoneState{
expectedFollowingStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
testutil.CreateZoneID("region2", "zone2"): stateFullDisruption,
},
@ -1018,11 +1018,11 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
unhealthyNodeNewStatus,
healthyNodeNewStatus,
},
expectedInitialStates: map[string]zoneState{
expectedInitialStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
testutil.CreateZoneID("region1", "zone2"): stateNormal,
},
expectedFollowingStates: map[string]zoneState{
expectedFollowingStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
testutil.CreateZoneID("region1", "zone2"): stateNormal,
},
@ -1079,10 +1079,10 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
unhealthyNodeNewStatus,
healthyNodeNewStatus,
},
expectedInitialStates: map[string]zoneState{
expectedInitialStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
},
expectedFollowingStates: map[string]zoneState{
expectedFollowingStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
},
expectedEvictPods: false,
@ -1139,11 +1139,11 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
unhealthyNodeNewStatus,
healthyNodeNewStatus,
},
expectedInitialStates: map[string]zoneState{
expectedInitialStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
testutil.CreateZoneID("region1", "zone2"): stateFullDisruption,
},
expectedFollowingStates: map[string]zoneState{
expectedFollowingStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): stateFullDisruption,
testutil.CreateZoneID("region1", "zone2"): stateNormal,
},
@ -1264,10 +1264,10 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
healthyNodeNewStatus,
healthyNodeNewStatus,
},
expectedInitialStates: map[string]zoneState{
expectedInitialStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): statePartialDisruption,
},
expectedFollowingStates: map[string]zoneState{
expectedFollowingStates: map[string]ZoneState{
testutil.CreateZoneID("region1", "zone1"): statePartialDisruption,
},
expectedEvictPods: true,
@ -1280,7 +1280,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
Existing: item.nodeList,
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler,
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController.now = func() metav1.Time { return fakeNow }
@ -1384,7 +1384,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}),
DeleteWaitChan: make(chan struct{}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute,
nodeController, _ := newNodeControllerFromClient(nil, fnh, 10*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false, false)
@ -1654,7 +1654,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for i, item := range table {
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController.now = func() metav1.Time { return fakeNow }
@ -1888,7 +1888,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}
for i, item := range table {
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController.now = func() metav1.Time { return fakeNow }
@ -1999,7 +1999,7 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
originalTaint := UnreachableTaintTemplate
updatedTaint := NotReadyTaintTemplate
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler,
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
nodeController.now = func() metav1.Time { return fakeNow }
@ -2092,7 +2092,7 @@ func TestTaintsNodeByCondition(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout,
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
nodeController.now = func() metav1.Time { return fakeNow }
@ -2270,7 +2270,7 @@ func TestNodeEventGeneration(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false, false)
@ -2384,7 +2384,7 @@ func TestCheckPod(t *testing.T) {
},
}
nc, _ := NewNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false, false)
nc, _ := newNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false, false)
nc.nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "new",