diff --git a/hack/.golint_failures b/hack/.golint_failures index 14dda74062a..0b3ee3f629b 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -174,7 +174,6 @@ pkg/controller/garbagecollector/metaonly pkg/controller/job pkg/controller/namespace pkg/controller/namespace/deletion -pkg/controller/node pkg/controller/podautoscaler pkg/controller/podautoscaler/metrics pkg/controller/podgc diff --git a/pkg/controller/node/BUILD b/pkg/controller/node/BUILD index 4f420b2aa01..400a67b930f 100644 --- a/pkg/controller/node/BUILD +++ b/pkg/controller/node/BUILD @@ -51,6 +51,7 @@ go_library( "//pkg/cloudprovider:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/node/ipam/sync:go_default_library", "//pkg/controller/node/scheduler:go_default_library", "//pkg/controller/node/util:go_default_library", "//pkg/util/metrics:go_default_library", @@ -65,7 +66,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/controller/node/ipam/BUILD b/pkg/controller/node/ipam/BUILD index 4c231fb0608..3f8e3d110c9 100644 --- a/pkg/controller/node/ipam/BUILD +++ b/pkg/controller/node/ipam/BUILD @@ -8,9 +8,15 @@ load( go_test( name = "go_default_test", - srcs = ["cidr_allocator_test.go"], + srcs = [ + "controller_test.go", + "range_allocator_test.go", + "timeout_test.go", + ], library = ":go_default_library", deps = [ + "//pkg/controller/node/ipam/cidrset:go_default_library", + "//pkg/controller/node/ipam/test:go_default_library", "//pkg/controller/testutil:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -22,28 +28,38 @@ go_test( go_library( name = "go_default_library", srcs = [ + "adapter.go", "cidr_allocator.go", "cloud_cidr_allocator.go", + "controller.go", + "doc.go", "range_allocator.go", + "timeout.go", ], deps = [ "//pkg/api:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller/node/ipam/cidrset:go_default_library", + "//pkg/controller/node/ipam/sync:go_default_library", "//pkg/controller/node/util:go_default_library", "//pkg/util/node:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme:go_default_library", ], ) @@ -59,6 +75,8 @@ filegroup( srcs = [ ":package-srcs", "//pkg/controller/node/ipam/cidrset:all-srcs", + "//pkg/controller/node/ipam/sync:all-srcs", + "//pkg/controller/node/ipam/test:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/node/ipam/adapter.go b/pkg/controller/node/ipam/adapter.go new file mode 100644 index 00000000000..00a91535c8e --- /dev/null +++ b/pkg/controller/node/ipam/adapter.go @@ -0,0 +1,125 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "context" + "encoding/json" + "net" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + nodeutil "k8s.io/kubernetes/pkg/util/node" + "k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme" +) + +type adapter struct { + k8s clientset.Interface + cloud *gce.GCECloud + + recorder record.EventRecorder +} + +func newAdapter(k8s clientset.Interface, cloud *gce.GCECloud) *adapter { + ret := &adapter{ + k8s: k8s, + cloud: cloud, + } + + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(glog.Infof) + ret.recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"}) + glog.V(0).Infof("Sending events to api server.") + broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ + Interface: v1core.New(k8s.Core().RESTClient()).Events(""), + }) + + return ret +} + +func (a *adapter) Alias(ctx context.Context, nodeName string) (*net.IPNet, error) { + cidrs, err := a.cloud.AliasRanges(types.NodeName(nodeName)) + if err != nil { + return nil, err + } + + switch len(cidrs) { + case 0: + return nil, nil + case 1: + break + default: + glog.Warningf("Node %q has more than one alias assigned (%v), defaulting to the first", nodeName, cidrs) + } + + _, cidrRange, err := net.ParseCIDR(cidrs[0]) + if err != nil { + return nil, err + } + + return cidrRange, nil +} + +func (a *adapter) AddAlias(ctx context.Context, nodeName string, cidrRange *net.IPNet) error { + return a.cloud.AddAliasToInstance(types.NodeName(nodeName), cidrRange) +} + +func (a *adapter) Node(ctx context.Context, name string) (*v1.Node, error) { + return a.k8s.Core().Nodes().Get(name, metav1.GetOptions{}) +} + +func (a *adapter) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error { + patch := map[string]interface{}{ + "apiVersion": node.APIVersion, + "kind": node.Kind, + "metadata": map[string]interface{}{"name": node.Name}, + "spec": map[string]interface{}{"podCIDR": cidrRange.String()}, + } + bytes, err := json.Marshal(patch) + if err != nil { + return err + } + + _, err = a.k8s.Core().Nodes().Patch(node.Name, types.StrategicMergePatchType, bytes) + return err +} + +func (a *adapter) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error { + condition := v1.ConditionFalse + if unavailable { + condition = v1.ConditionTrue + } + return nodeutil.SetNodeCondition(a.k8s, types.NodeName(nodeName), v1.NodeCondition{ + Type: v1.NodeNetworkUnavailable, + Status: condition, + Reason: "RouteCreated", + Message: "NodeController created an implicit route", + LastTransitionTime: metav1.Now(), + }) +} + +func (a *adapter) EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{}) { + ref := &v1.ObjectReference{Kind: "Node", Name: nodeName} + a.recorder.Eventf(ref, v1.EventTypeNormal, reason, fmt, args...) +} diff --git a/pkg/controller/node/ipam/cidr_allocator.go b/pkg/controller/node/ipam/cidr_allocator.go index e6c7f47869c..ce1d20b9e10 100644 --- a/pkg/controller/node/ipam/cidr_allocator.go +++ b/pkg/controller/node/ipam/cidr_allocator.go @@ -17,9 +17,20 @@ limitations under the License. package ipam import ( + "fmt" "net" + "time" - v1 "k8s.io/api/core/v1" + "github.com/golang/glog" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/cloudprovider" ) type nodeAndCIDR struct { @@ -37,10 +48,19 @@ const ( // CloudAllocatorType is the allocator that uses cloud platform // support to do node CIDR range allocations. CloudAllocatorType CIDRAllocatorType = "CloudAllocator" + // IPAMFromClusterAllocatorType uses the ipam controller sync'ing the node + // CIDR range allocations from the cluster to the cloud. + IPAMFromClusterAllocatorType = "IPAMFromCluster" + // IPAMFromCloudAllocatorType uses the ipam controller sync'ing the node + // CIDR range allocations from the cloud to the cluster. + IPAMFromCloudAllocatorType = "IPAMFromCloud" + + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute ) -// CIDRAllocator is an interface implemented by things that know how to -// allocate/occupy/recycle CIDR for nodes. +// CIDRAllocator is an interface implemented by things that know how +// to allocate/occupy/recycle CIDR for nodes. type CIDRAllocator interface { // AllocateOrOccupyCIDR looks at the given node, assigns it a valid // CIDR if it doesn't currently have one or mark the CIDR as used if @@ -48,4 +68,45 @@ type CIDRAllocator interface { AllocateOrOccupyCIDR(node *v1.Node) error // ReleaseCIDR releases the CIDR of the removed node ReleaseCIDR(node *v1.Node) error + // Register allocator with the nodeInformer for updates. + Register(nodeInformer informers.NodeInformer) +} + +// New creates a new CIDR range allocator. +func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) { + nodeList, err := listNodes(kubeClient) + if err != nil { + return nil, err + } + + switch allocatorType { + case RangeAllocatorType: + return NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList) + case CloudAllocatorType: + return NewCloudCIDRAllocator(kubeClient, cloud) + default: + return nil, fmt.Errorf("Invalid CIDR allocator type: %v", allocatorType) + } +} + +func listNodes(kubeClient clientset.Interface) (*v1.NodeList, error) { + var nodeList *v1.NodeList + // We must poll because apiserver might not be up. This error causes + // controller manager to restart. + if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) { + var err error + nodeList, err = kubeClient.Core().Nodes().List(metav1.ListOptions{ + FieldSelector: fields.Everything().String(), + LabelSelector: labels.Everything().String(), + }) + if err != nil { + glog.Errorf("Failed to list all nodes: %v", err) + return false, nil + } + return true, nil + }); pollErr != nil { + return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", + apiserverStartupGracePeriod) + } + return nodeList, nil } diff --git a/pkg/controller/node/ipam/cidrset/cidr_set.go b/pkg/controller/node/ipam/cidrset/cidr_set.go index ce08dd8af3f..a83f73a293d 100644 --- a/pkg/controller/node/ipam/cidrset/cidr_set.go +++ b/pkg/controller/node/ipam/cidrset/cidr_set.go @@ -102,7 +102,8 @@ func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet { } } -// AllocateNext allocates the next free CIDR range. +// AllocateNext allocates the next free CIDR range. This will set the range +// as occupied and return the allocated range. func (s *CidrSet) AllocateNext() (*net.IPNet, error) { s.Lock() defer s.Unlock() @@ -186,7 +187,8 @@ func (s *CidrSet) Release(cidr *net.IPNet) error { return nil } -// Occupy marks the given CIDR range as used. +// Occupy marks the given CIDR range as used. Occupy does not check if the CIDR +// range was previously used. func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) { begin, end, err := s.getBeginingAndEndIndices(cidr) if err != nil { @@ -203,21 +205,24 @@ func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) { } func (s *CidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) { - var cidrIndex uint32 - if cidr.IP.To4() != nil { - cidrIndex = (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize) - if cidrIndex >= uint32(s.maxCIDRs) { - return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) - } - } else if cidr.IP.To16() != nil { - cidrIndex64 := (binary.BigEndian.Uint64(s.clusterIP) ^ binary.BigEndian.Uint64(cidr.IP.To16())) >> uint64(64-s.subNetMaskSize) - - if cidrIndex64 >= uint64(s.maxCIDRs) { - return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) - } - cidrIndex = uint32(cidrIndex64) - } else { - return 0, fmt.Errorf("invalid CIDR block: %v", cidr) - } - return int(cidrIndex), nil + return s.getIndexForIP(cidr.IP) +} + +func (s *CidrSet) getIndexForIP(ip net.IP) (int, error) { + if ip.To4() != nil { + cidrIndex := (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(ip.To4())) >> uint32(32-s.subNetMaskSize) + if cidrIndex >= uint32(s.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.subNetMaskSize) + } + return int(cidrIndex), nil + } + if ip.To16() != nil { + cidrIndex := (binary.BigEndian.Uint64(s.clusterIP) ^ binary.BigEndian.Uint64(ip.To16())) >> uint64(64-s.subNetMaskSize) + if cidrIndex >= uint64(s.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.subNetMaskSize) + } + return int(cidrIndex), nil + } + + return 0, fmt.Errorf("invalid IP: %v", ip) } diff --git a/pkg/controller/node/ipam/cloud_cidr_allocator.go b/pkg/controller/node/ipam/cloud_cidr_allocator.go index 9dfb9543f83..da0d5e6fb4b 100644 --- a/pkg/controller/node/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/node/ipam/cloud_cidr_allocator.go @@ -24,7 +24,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/api/core/v1" @@ -142,3 +143,16 @@ func (ca *cloudCIDRAllocator) ReleaseCIDR(node *v1.Node) error { node.Name, node.Spec.PodCIDR) return nil } + +func (ca *cloudCIDRAllocator) Register(nodeInformer informers.NodeInformer) { + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR), + UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + if newNode.Spec.PodCIDR == "" { + return ca.AllocateOrOccupyCIDR(newNode) + } + return nil + }), + DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR), + }) +} diff --git a/pkg/controller/node/ipam/controller.go b/pkg/controller/node/ipam/controller.go new file mode 100644 index 00000000000..f8a2375678f --- /dev/null +++ b/pkg/controller/node/ipam/controller.go @@ -0,0 +1,210 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + informers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" + nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync" + "k8s.io/kubernetes/pkg/controller/node/util" +) + +// Config for the IPAM controller. +type Config struct { + // Resync is the default timeout duration when there are no errors. + Resync time.Duration + // MaxBackoff is the maximum timeout when in a error backoff state. + MaxBackoff time.Duration + // InitialRetry is the initial retry interval when an error is reported. + InitialRetry time.Duration + // Mode to use to synchronize. + Mode nodesync.NodeSyncMode +} + +// Controller is the controller for synchronizing cluster and cloud node +// pod CIDR range assignments. +type Controller struct { + config *Config + adapter *adapter + + lock sync.Mutex + syncers map[string]*nodesync.NodeSync + + set *cidrset.CidrSet +} + +// NewController returns a new instance of the IPAM controller. +func NewController( + config *Config, + kubeClient clientset.Interface, + cloud cloudprovider.Interface, + clusterCIDR, serviceCIDR *net.IPNet, + nodeCIDRMaskSize int) (*Controller, error) { + + if !nodesync.IsValidMode(config.Mode) { + return nil, fmt.Errorf("invalid IPAM controller mode %q", config.Mode) + } + + gceCloud, ok := cloud.(*gce.GCECloud) + if !ok { + return nil, fmt.Errorf("cloud IPAM controller does not support %q provider", cloud.ProviderName()) + } + + c := &Controller{ + config: config, + adapter: newAdapter(kubeClient, gceCloud), + syncers: make(map[string]*nodesync.NodeSync), + set: cidrset.NewCIDRSet(clusterCIDR, nodeCIDRMaskSize), + } + + if err := occupyServiceCIDR(c.set, clusterCIDR, serviceCIDR); err != nil { + return nil, err + } + + return c, nil +} + +// Start initializes the Controller with the existing list of nodes and +// registers the informers for node chnages. This will start synchronization +// of the node and cloud CIDR range allocations. +func (c *Controller) Start(nodeInformer informers.NodeInformer) error { + glog.V(0).Infof("Starting IPAM controller (config=%+v)", c.config) + + nodes, err := listNodes(c.adapter.k8s) + if err != nil { + return err + } + for _, node := range nodes.Items { + if node.Spec.PodCIDR != "" { + _, cidrRange, err := net.ParseCIDR(node.Spec.PodCIDR) + if err == nil { + c.set.Occupy(cidrRange) + glog.V(3).Infof("Occupying CIDR for node %q (%v)", node.Name, node.Spec.PodCIDR) + } else { + glog.Errorf("Node %q has an invalid CIDR (%q): %v", node.Name, node.Spec.PodCIDR, err) + } + } + + func() { + c.lock.Lock() + defer c.lock.Unlock() + + // XXX/bowei -- stagger the start of each sync cycle. + syncer := c.newSyncer(node.Name) + c.syncers[node.Name] = syncer + go syncer.Loop(nil) + }() + } + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: util.CreateAddNodeHandler(c.onAdd), + UpdateFunc: util.CreateUpdateNodeHandler(c.onUpdate), + DeleteFunc: util.CreateDeleteNodeHandler(c.onDelete), + }) + + return nil +} + +// occupyServiceCIDR removes the service CIDR range from the cluster CIDR if it +// intersects. +func occupyServiceCIDR(set *cidrset.CidrSet, clusterCIDR, serviceCIDR *net.IPNet) error { + if clusterCIDR.Contains(serviceCIDR.IP) || serviceCIDR.Contains(clusterCIDR.IP) { + if err := set.Occupy(serviceCIDR); err != nil { + return err + } + } + return nil +} + +type nodeState struct { + t Timeout +} + +func (ns *nodeState) ReportResult(err error) { + ns.t.Update(err == nil) +} + +func (ns *nodeState) ResyncTimeout() time.Duration { + return ns.t.Next() +} + +func (c *Controller) newSyncer(name string) *nodesync.NodeSync { + ns := &nodeState{ + Timeout{ + Resync: c.config.Resync, + MaxBackoff: c.config.MaxBackoff, + InitialRetry: c.config.InitialRetry, + }, + } + return nodesync.New(ns, c.adapter, c.adapter, c.config.Mode, name, c.set) +} + +func (c *Controller) onAdd(node *v1.Node) error { + c.lock.Lock() + defer c.lock.Unlock() + + if syncer, ok := c.syncers[node.Name]; !ok { + syncer = c.newSyncer(node.Name) + c.syncers[node.Name] = syncer + go syncer.Loop(nil) + } else { + glog.Warningf("Add for node %q that already exists", node.Name) + syncer.Update(node) + } + + return nil +} + +func (c *Controller) onUpdate(_, node *v1.Node) error { + c.lock.Lock() + defer c.lock.Unlock() + + if sync, ok := c.syncers[node.Name]; ok { + sync.Update(node) + } else { + glog.Errorf("Received update for non-existant node %q", node.Name) + return fmt.Errorf("unknown node %q", node.Name) + } + + return nil +} + +func (c *Controller) onDelete(node *v1.Node) error { + c.lock.Lock() + defer c.lock.Unlock() + + if syncer, ok := c.syncers[node.Name]; ok { + syncer.Delete(node) + delete(c.syncers, node.Name) + } else { + glog.Warning("Node %q was already deleted", node.Name) + } + + return nil +} diff --git a/pkg/controller/node/ipam/controller_test.go b/pkg/controller/node/ipam/controller_test.go new file mode 100644 index 00000000000..22baf2081b0 --- /dev/null +++ b/pkg/controller/node/ipam/controller_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "net" + "testing" + + "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/node/ipam/test" +) + +func TestOccupyServiceCIDR(t *testing.T) { + const clusterCIDR = "10.1.0.0/16" + +TestCase: + for _, tc := range []struct { + serviceCIDR string + }{ + {"10.0.255.0/24"}, + {"10.1.0.0/24"}, + {"10.1.255.0/24"}, + {"10.2.0.0/24"}, + } { + serviceCIDR := test.MustParseCIDR(tc.serviceCIDR) + set := cidrset.NewCIDRSet(test.MustParseCIDR(clusterCIDR), 24) + if err := occupyServiceCIDR(set, test.MustParseCIDR(clusterCIDR), serviceCIDR); err != nil { + t.Errorf("test case %+v: occupyServiceCIDR() = %v, want nil", tc, err) + } + // Allocate until full. + var cidrs []*net.IPNet + for { + cidr, err := set.AllocateNext() + if err != nil { + if err == cidrset.ErrCIDRRangeNoCIDRsRemaining { + break + } + t.Errorf("set.AllocateNext() = %v, want %v", err, cidrset.ErrCIDRRangeNoCIDRsRemaining) + continue TestCase + } + cidrs = append(cidrs, cidr) + } + // No allocated CIDR range should intersect with serviceCIDR. + for _, c := range cidrs { + if c.Contains(serviceCIDR.IP) || serviceCIDR.Contains(c.IP) { + t.Errorf("test case %+v: allocated CIDR %v from service range", tc, c) + } + } + } +} diff --git a/pkg/controller/node/ipam/doc.go b/pkg/controller/node/ipam/doc.go new file mode 100644 index 00000000000..76e51570760 --- /dev/null +++ b/pkg/controller/node/ipam/doc.go @@ -0,0 +1,30 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ipam provides different allocators for assigning IP ranges to nodes. +// We currently support several kinds of IPAM allocators (these are denoted by +// the CIDRAllocatorType): +// - RangeAllocator is an allocator that assigns PodCIDRs to nodes and works +// in conjunction with the RouteController to configure the network to get +// connectivity. +// - CloudAllocator is an allocator that synchronizes PodCIDRs from IP +// ranges assignments from the underlying cloud platform. +// - (Alpha only) IPAMFromCluster is an allocator that has the similar +// functionality as the RangeAllocator but also synchronizes cluster-managed +// ranges into the cloud platform. +// - (Alpha only) IPAMFromCloud is the same as CloudAllocator (synchronizes +// from cloud into the cluster.) +package ipam diff --git a/pkg/controller/node/ipam/range_allocator.go b/pkg/controller/node/ipam/range_allocator.go index 53c6f22fc90..24a7b77ae48 100644 --- a/pkg/controller/node/ipam/range_allocator.go +++ b/pkg/controller/node/ipam/range_allocator.go @@ -28,9 +28,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" @@ -264,3 +266,35 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } return err } + +func (r *rangeAllocator) Register(nodeInformer informers.NodeInformer) { + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: util.CreateAddNodeHandler(r.AllocateOrOccupyCIDR), + UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + // If the PodCIDR is not empty we either: + // - already processed a Node that already had a CIDR after NC restarted + // (cidr is marked as used), + // - already processed a Node successfully and allocated a CIDR for it + // (cidr is marked as used), + // - already processed a Node but we did saw a "timeout" response and + // request eventually got through in this case we haven't released + // the allocated CIDR (cidr is still marked as used). + // There's a possible error here: + // - NC sees a new Node and assigns a CIDR X to it, + // - Update Node call fails with a timeout, + // - Node is updated by some other component, NC sees an update and + // assigns CIDR Y to the Node, + // - Both CIDR X and CIDR Y are marked as used in the local cache, + // even though Node sees only CIDR Y + // The problem here is that in in-memory cache we see CIDR X as marked, + // which prevents it from being assigned to any new node. The cluster + // state is correct. + // Restart of NC fixes the issue. + if newNode.Spec.PodCIDR == "" { + return r.AllocateOrOccupyCIDR(newNode) + } + return nil + }), + DeleteFunc: util.CreateDeleteNodeHandler(r.ReleaseCIDR), + }) +} diff --git a/pkg/controller/node/ipam/cidr_allocator_test.go b/pkg/controller/node/ipam/range_allocator_test.go similarity index 100% rename from pkg/controller/node/ipam/cidr_allocator_test.go rename to pkg/controller/node/ipam/range_allocator_test.go diff --git a/pkg/controller/node/ipam/sync/BUILD b/pkg/controller/node/ipam/sync/BUILD new file mode 100644 index 00000000000..30433dcaed1 --- /dev/null +++ b/pkg/controller/node/ipam/sync/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["sync.go"], + visibility = ["//visibility:public"], + deps = [ + "//pkg/controller/node/ipam/cidrset:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["sync_test.go"], + library = ":go_default_library", + deps = [ + "//pkg/controller/node/ipam/cidrset:go_default_library", + "//pkg/controller/node/ipam/test:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/node/ipam/sync/sync.go b/pkg/controller/node/ipam/sync/sync.go new file mode 100644 index 00000000000..60c9d59b6b1 --- /dev/null +++ b/pkg/controller/node/ipam/sync/sync.go @@ -0,0 +1,386 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" +) + +const ( + // InvalidPodCIDR is the event recorded when a node is found with an + // invalid PodCIDR. + InvalidPodCIDR = "CloudCIDRAllocatorInvalidPodCIDR" + // InvalidModeEvent is the event recorded when the CIDR range cannot be + // sync'd due to the cluster running in the wrong mode. + InvalidModeEvent = "CloudCIDRAllocatorInvalidMode" + // MismatchEvent is the event recorded when the CIDR range allocated in the + // node spec does not match what has been allocated in the cloud. + MismatchEvent = "CloudCIDRAllocatorMismatch" +) + +// cloudAlias is the interface to the cloud platform APIs. +type cloudAlias interface { + // Alias returns the IP alias for the node. + Alias(ctx context.Context, nodeName string) (*net.IPNet, error) + // AddAlias adds an alias to the node. + AddAlias(ctx context.Context, nodeName string, cidrRange *net.IPNet) error +} + +// kubeAPI is the interface to the Kubernetes APIs. +type kubeAPI interface { + // Node returns the spec for the Node object. + Node(ctx context.Context, name string) (*v1.Node, error) + // UpdateNodePodCIDR updates the PodCIDR in the Node spec. + UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error + // UpdateNodeNetworkUnavailable updates the network unavailable status for the node. + UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error + // EmitNodeEvent emits an event for the given node. + EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{}) +} + +// controller is the interface to the controller. +type controller interface { + // ReportResult updates the controller with the result of the latest + // sync operation. + ReportResult(err error) + // ResyncTimeout returns the amount of time to wait before retrying + // a sync with a node. + ResyncTimeout() time.Duration +} + +// NodeSyncMode is the mode the cloud CIDR allocator runs in. +type NodeSyncMode string + +var ( + // SyncFromCloud is the mode that synchronizes the IP allocation from the cloud + // platform to the node. + SyncFromCloud NodeSyncMode = "SyncFromCloud" + // SyncFromCluster is the mode that synchronizes the IP allocation determined + // by the k8s controller to the cloud provider. + SyncFromCluster NodeSyncMode = "SyncFromCluster" +) + +// IsValidMode returns true if the given mode is valid. +func IsValidMode(m NodeSyncMode) bool { + switch m { + case SyncFromCloud: + case SyncFromCluster: + default: + return false + } + return true +} + +// NodeSync synchronizes the state for a single node in the cluster. +type NodeSync struct { + c controller + cloudAlias cloudAlias + kubeAPI kubeAPI + mode NodeSyncMode + nodeName string + opChan chan syncOp + set *cidrset.CidrSet +} + +// New returns a new syncer for a given node. +func New(c controller, cloudAlias cloudAlias, kubeAPI kubeAPI, mode NodeSyncMode, nodeName string, set *cidrset.CidrSet) *NodeSync { + return &NodeSync{ + c: c, + cloudAlias: cloudAlias, + kubeAPI: kubeAPI, + mode: mode, + nodeName: nodeName, + opChan: make(chan syncOp, 1), + set: set, + } +} + +// Loop runs the sync loop for a given node. done is an optional channel that +// is closed when the Loop() returns. +func (sync *NodeSync) Loop(done chan struct{}) { + glog.V(2).Infof("Starting sync loop for node %q", sync.nodeName) + + defer func() { + if done != nil { + close(done) + } + }() + + timeout := sync.c.ResyncTimeout() + delayTimer := time.NewTimer(timeout) + glog.V(4).Infof("Resync node %q in %v", sync.nodeName, timeout) + + for { + select { + case op, more := <-sync.opChan: + if !more { + glog.V(2).Infof("Stopping sync loop") + return + } + sync.c.ReportResult(op.run(sync)) + if !delayTimer.Stop() { + <-delayTimer.C + } + case <-delayTimer.C: + glog.V(4).Infof("Running resync for node %q", sync.nodeName) + sync.c.ReportResult((&updateOp{}).run(sync)) + } + + timeout := sync.c.ResyncTimeout() + delayTimer.Reset(timeout) + glog.V(4).Infof("Resync node %q in %v", sync.nodeName, timeout) + } +} + +// Update causes an update operation on the given node. If node is nil, then +// the syncer will fetch the node spec from the API server before syncing. +// +// This method is safe to call from multiple goroutines. +func (sync *NodeSync) Update(node *v1.Node) { + sync.opChan <- &updateOp{node} +} + +// Delete performs the sync operations necessary to remove the node from the +// IPAM state. +// +// This method is safe to call from multiple goroutines. +func (sync *NodeSync) Delete(node *v1.Node) { + sync.opChan <- &deleteOp{node} + close(sync.opChan) +} + +// syncOp is the interface for generic sync operation. +type syncOp interface { + // run the requested sync operation. + run(sync *NodeSync) error +} + +// updateOp handles creation and updates of a node. +type updateOp struct { + node *v1.Node +} + +func (op *updateOp) String() string { + if op.node == nil { + return fmt.Sprintf("updateOp(nil)") + } + return fmt.Sprintf("updateOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR) +} + +func (op *updateOp) run(sync *NodeSync) error { + glog.V(3).Infof("Running updateOp %+v", op) + + ctx := context.Background() + + if op.node == nil { + glog.V(3).Infof("Getting node spec for %q", sync.nodeName) + node, err := sync.kubeAPI.Node(ctx, sync.nodeName) + if err != nil { + glog.Errorf("Error getting node %q spec: %v", sync.nodeName, err) + return err + } + op.node = node + } + + aliasRange, err := sync.cloudAlias.Alias(ctx, sync.nodeName) + if err != nil { + glog.Errorf("Error getting cloud alias for node %q: %v", sync.nodeName, err) + return err + } + + switch { + case op.node.Spec.PodCIDR == "" && aliasRange == nil: + err = op.allocateRange(ctx, sync, op.node) + case op.node.Spec.PodCIDR == "" && aliasRange != nil: + err = op.updateNodeFromAlias(ctx, sync, op.node, aliasRange) + case op.node.Spec.PodCIDR != "" && aliasRange == nil: + err = op.updateAliasFromNode(ctx, sync, op.node) + case op.node.Spec.PodCIDR != "" && aliasRange != nil: + err = op.validateRange(ctx, sync, op.node, aliasRange) + } + + return err +} + +// validateRange checks that the allocated range and the alias range +// match. +func (op *updateOp) validateRange(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error { + if node.Spec.PodCIDR != aliasRange.String() { + glog.Errorf("Inconsistency detected between node PodCIDR and node alias (%v != %v)", + node.Spec.PodCIDR, aliasRange) + sync.kubeAPI.EmitNodeWarningEvent(node.Name, MismatchEvent, + "Node.Spec.PodCIDR != cloud alias (%v != %v)", node.Spec.PodCIDR, aliasRange) + // User intervention is required in this case, as this is most likely due + // to the user mucking around with their VM aliases on the side. + } else { + glog.V(4).Infof("Node %q CIDR range %v is matches cloud assignment", node.Name, node.Spec.PodCIDR) + } + return nil +} + +// updateNodeFromAlias updates the the node from the cloud allocated +// alias. +func (op *updateOp) updateNodeFromAlias(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error { + if sync.mode != SyncFromCloud { + sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent, + "Cannot sync from cloud in mode %q", sync.mode) + return fmt.Errorf("cannot sync from cloud in mode %q", sync.mode) + } + + glog.V(2).Infof("Updating node spec with alias range, node.PodCIDR = %v", aliasRange) + + if err := sync.set.Occupy(aliasRange); err != nil { + glog.Errorf("Error occupying range %v for node %v", aliasRange, sync.nodeName) + return err + } + + if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, aliasRange); err != nil { + glog.Errorf("Could not update node %q PodCIDR to %v: %v", node.Name, aliasRange, err) + return err + } + + glog.V(2).Infof("Node %q PodCIDR set to %v", node.Name, aliasRange) + + if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil { + glog.Errorf("Error setting route status for node %q: %v", node.Name, err) + return err + } + + if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil { + glog.Errorf("Could not update node NetworkUnavailable status to false: %v", err) + return err + } + + glog.V(2).Infof("Updated node %q PodCIDR from cloud alias %v", node.Name, aliasRange) + + return nil +} + +// updateAliasFromNode updates the cloud alias given the node allocation. +func (op *updateOp) updateAliasFromNode(ctx context.Context, sync *NodeSync, node *v1.Node) error { + if sync.mode != SyncFromCluster { + sync.kubeAPI.EmitNodeWarningEvent( + node.Name, InvalidModeEvent, "Cannot sync to cloud in mode %q", sync.mode) + return fmt.Errorf("cannot sync to cloud in mode %q", sync.mode) + } + + _, aliasRange, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + glog.Errorf("Could not parse PodCIDR (%q) for node %q: %v", + node.Spec.PodCIDR, node.Name, err) + return err + } + + if err := sync.set.Occupy(aliasRange); err != nil { + glog.Errorf("Error occupying range %v for node %v", aliasRange, sync.nodeName) + return err + } + + if err := sync.cloudAlias.AddAlias(ctx, node.Name, aliasRange); err != nil { + glog.Errorf("Could not add alias %v for node %q: %v", aliasRange, node.Name, err) + return err + } + + if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil { + glog.Errorf("Could not update node NetworkUnavailable status to false: %v", err) + return err + } + + glog.V(2).Infof("Updated node %q cloud alias with node spec, node.PodCIDR = %v", + node.Name, node.Spec.PodCIDR) + + return nil +} + +// allocateRange allocates a new range and updates both the cloud +// platform and the node allocation. +func (op *updateOp) allocateRange(ctx context.Context, sync *NodeSync, node *v1.Node) error { + if sync.mode != SyncFromCluster { + sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent, + "Cannot allocate CIDRs in mode %q", sync.mode) + return fmt.Errorf("controller cannot allocate CIDRS in mode %q", sync.mode) + } + + cidrRange, err := sync.set.AllocateNext() + if err != nil { + return err + } + // If addAlias returns a hard error, cidrRange will be leaked as there + // is no durable record of the range. The missing space will be + // recovered on the next restart of the controller. + if err := sync.cloudAlias.AddAlias(ctx, node.Name, cidrRange); err != nil { + glog.Errorf("Could not add alias %v for node %q: %v", cidrRange, node.Name, err) + return err + } + + if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, cidrRange); err != nil { + glog.Errorf("Could not update node %q PodCIDR to %v: %v", node.Name, cidrRange, err) + return err + } + + if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil { + glog.Errorf("Could not update node NetworkUnavailable status to false: %v", err) + return err + } + + glog.V(2).Infof("Allocated PodCIDR %v for node %q", cidrRange, node.Name) + + return nil +} + +// deleteOp handles deletion of a node. +type deleteOp struct { + node *v1.Node +} + +func (op *deleteOp) String() string { + if op.node == nil { + return fmt.Sprintf("deleteOp(nil)") + } + return fmt.Sprintf("deleteOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR) +} + +func (op *deleteOp) run(sync *NodeSync) error { + glog.V(3).Infof("Running deleteOp %+v", op) + if op.node.Spec.PodCIDR == "" { + glog.V(2).Infof("Node %q was deleted, node had no PodCIDR range assigned", op.node.Name) + return nil + } + + _, cidrRange, err := net.ParseCIDR(op.node.Spec.PodCIDR) + if err != nil { + glog.Errorf("Deleted node %q has an invalid podCIDR %q: %v", + op.node.Name, op.node.Spec.PodCIDR, err) + sync.kubeAPI.EmitNodeWarningEvent(op.node.Name, InvalidPodCIDR, + "Node %q has an invalid PodCIDR: %q", op.node.Name, op.node.Spec.PodCIDR) + return nil + } + + sync.set.Release(cidrRange) + glog.V(2).Infof("Node %q was deleted, releasing CIDR range %v", + op.node.Name, op.node.Spec.PodCIDR) + + return nil +} diff --git a/pkg/controller/node/ipam/sync/sync_test.go b/pkg/controller/node/ipam/sync/sync_test.go new file mode 100644 index 00000000000..15d3e1b5b35 --- /dev/null +++ b/pkg/controller/node/ipam/sync/sync_test.go @@ -0,0 +1,294 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "fmt" + "net" + "reflect" + "testing" + "time" + + "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/node/ipam/test" + + "k8s.io/api/core/v1" +) + +var ( + _, clusterCIDRRange, _ = net.ParseCIDR("10.1.0.0/16") +) + +type fakeEvent struct { + nodeName string + reason string +} + +type fakeAPIs struct { + aliasRange *net.IPNet + aliasErr error + addAliasErr error + nodeRet *v1.Node + nodeErr error + updateNodeErr error + resyncTimeout time.Duration + reportChan chan struct{} + + updateNodeNetworkUnavailableErr error + + calls []string + events []fakeEvent + results []error +} + +func (f *fakeAPIs) Alias(ctx context.Context, nodeName string) (*net.IPNet, error) { + f.calls = append(f.calls, fmt.Sprintf("alias %v", nodeName)) + return f.aliasRange, f.aliasErr +} + +func (f *fakeAPIs) AddAlias(ctx context.Context, nodeName string, cidrRange *net.IPNet) error { + f.calls = append(f.calls, fmt.Sprintf("addAlias %v %v", nodeName, cidrRange)) + return f.addAliasErr +} + +func (f *fakeAPIs) Node(ctx context.Context, name string) (*v1.Node, error) { + f.calls = append(f.calls, fmt.Sprintf("node %v", name)) + return f.nodeRet, f.nodeErr +} + +func (f *fakeAPIs) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error { + f.calls = append(f.calls, fmt.Sprintf("updateNode %v", node)) + return f.updateNodeErr +} + +func (f *fakeAPIs) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error { + f.calls = append(f.calls, fmt.Sprintf("updateNodeNetworkUnavailable %v %v", nodeName, unavailable)) + return f.updateNodeNetworkUnavailableErr +} + +func (f *fakeAPIs) EmitNodeWarningEvent(nodeName, reason, fmtStr string, args ...interface{}) { + f.events = append(f.events, fakeEvent{nodeName, reason}) +} + +func (f *fakeAPIs) ReportResult(err error) { + glog.V(2).Infof("ReportResult %v", err) + f.results = append(f.results, err) + if f.reportChan != nil { + f.reportChan <- struct{}{} + } +} + +func (f *fakeAPIs) ResyncTimeout() time.Duration { + if f.resyncTimeout == 0 { + return time.Second * 10000 + } + return f.resyncTimeout +} + +func (f *fakeAPIs) dumpTrace() { + for i, x := range f.calls { + glog.Infof("trace %v: %v", i, x) + } +} + +var nodeWithoutCIDRRange = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, +} + +var nodeWithCIDRRange = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: v1.NodeSpec{PodCIDR: "10.1.1.0/24"}, +} + +func TestNodeSyncUpdate(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + mode NodeSyncMode + node *v1.Node + fake fakeAPIs + + events []fakeEvent + wantError bool + }{ + { + desc: "validate range ==", + mode: SyncFromCloud, + node: nodeWithCIDRRange, + fake: fakeAPIs{ + aliasRange: test.MustParseCIDR(nodeWithCIDRRange.Spec.PodCIDR), + }, + }, + { + desc: "validate range !=", + mode: SyncFromCloud, + node: nodeWithCIDRRange, + fake: fakeAPIs{aliasRange: test.MustParseCIDR("192.168.0.0/24")}, + events: []fakeEvent{{"node1", "CloudCIDRAllocatorMismatch"}}, + }, + { + desc: "update alias from node", + mode: SyncFromCloud, + node: nodeWithCIDRRange, + events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}}, + wantError: true, + }, + { + desc: "update alias from node", + mode: SyncFromCluster, + node: nodeWithCIDRRange, + // XXX/bowei -- validation + }, + { + desc: "update node from alias", + mode: SyncFromCloud, + node: nodeWithoutCIDRRange, + fake: fakeAPIs{aliasRange: test.MustParseCIDR("10.1.2.3/16")}, + // XXX/bowei -- validation + }, + { + desc: "update node from alias", + mode: SyncFromCluster, + node: nodeWithoutCIDRRange, + fake: fakeAPIs{aliasRange: test.MustParseCIDR("10.1.2.3/16")}, + events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}}, + wantError: true, + }, + { + desc: "allocate range", + mode: SyncFromCloud, + node: nodeWithoutCIDRRange, + events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}}, + wantError: true, + }, + { + desc: "allocate range", + mode: SyncFromCluster, + node: nodeWithoutCIDRRange, + }, + { + desc: "update with node==nil", + mode: SyncFromCluster, + node: nil, + fake: fakeAPIs{ + nodeRet: nodeWithCIDRRange, + }, + wantError: false, + }, + } { + sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidrset.NewCIDRSet(clusterCIDRRange, 24)) + doneChan := make(chan struct{}) + + // Do a single step of the loop. + go sync.Loop(doneChan) + sync.Update(tc.node) + close(sync.opChan) + <-doneChan + tc.fake.dumpTrace() + + if !reflect.DeepEqual(tc.fake.events, tc.events) { + t.Errorf("%v, %v; fake.events = %#v, want %#v", tc.desc, tc.mode, tc.fake.events, tc.events) + } + + var hasError bool + for _, r := range tc.fake.results { + hasError = hasError || (r != nil) + } + if hasError != tc.wantError { + t.Errorf("%v, %v; hasError = %t, errors = %v, want %t", + tc.desc, tc.mode, hasError, tc.fake.events, tc.wantError) + } + } +} + +func TestNodeSyncResync(t *testing.T) { + fake := &fakeAPIs{ + nodeRet: nodeWithCIDRRange, + resyncTimeout: time.Millisecond, + reportChan: make(chan struct{}), + } + sync := New(fake, fake, fake, SyncFromCluster, "node1", cidrset.NewCIDRSet(clusterCIDRRange, 24)) + doneChan := make(chan struct{}) + + go sync.Loop(doneChan) + <-fake.reportChan + close(sync.opChan) + // Unblock loop(). + go func() { + <-fake.reportChan + }() + <-doneChan + fake.dumpTrace() +} + +func TestNodeSyncDelete(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + mode NodeSyncMode + node *v1.Node + fake fakeAPIs + }{ + { + desc: "delete", + mode: SyncFromCluster, + node: nodeWithCIDRRange, + }, + { + desc: "delete without CIDR range", + mode: SyncFromCluster, + node: nodeWithoutCIDRRange, + }, + { + desc: "delete with invalid CIDR range", + mode: SyncFromCluster, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: v1.NodeSpec{PodCIDR: "invalid"}, + }, + }, + } { + sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidrset.NewCIDRSet(clusterCIDRRange, 24)) + doneChan := make(chan struct{}) + + // Do a single step of the loop. + go sync.Loop(doneChan) + sync.Delete(tc.node) + <-doneChan + tc.fake.dumpTrace() + + /* + if !reflect.DeepEqual(tc.fake.events, tc.events) { + t.Errorf("%v, %v; fake.events = %#v, want %#v", tc.desc, tc.mode, tc.fake.events, tc.events) + } + + var hasError bool + for _, r := range tc.fake.results { + hasError = hasError || (r != nil) + } + if hasError != tc.wantError { + t.Errorf("%v, %v; hasError = %t, errors = %v, want %t", + tc.desc, tc.mode, hasError, tc.fake.events, tc.wantError) + } + */ + } +} diff --git a/pkg/controller/node/ipam/test/BUILD b/pkg/controller/node/ipam/test/BUILD new file mode 100644 index 00000000000..262933dd629 --- /dev/null +++ b/pkg/controller/node/ipam/test/BUILD @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["utils.go"], + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/node/ipam/test/utils.go b/pkg/controller/node/ipam/test/utils.go new file mode 100644 index 00000000000..14fd6663086 --- /dev/null +++ b/pkg/controller/node/ipam/test/utils.go @@ -0,0 +1,31 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "net" +) + +// MustParseCIDR returns the CIDR range parsed from s or panics if the string +// cannot be parsed. +func MustParseCIDR(s string) *net.IPNet { + _, ret, err := net.ParseCIDR(s) + if err != nil { + panic(err) + } + return ret +} diff --git a/pkg/controller/node/ipam/timeout.go b/pkg/controller/node/ipam/timeout.go new file mode 100644 index 00000000000..f1160b4cf9a --- /dev/null +++ b/pkg/controller/node/ipam/timeout.go @@ -0,0 +1,67 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "time" +) + +// Timeout manages the resync loop timing for a given node sync operation. The +// timeout changes depending on whether or not there was an error reported for +// the operation. Consecutive errors will result in exponential backoff to a +// maxBackoff timeout. +type Timeout struct { + // Resync is the default timeout duration when there are no errors. + Resync time.Duration + // MaxBackoff is the maximum timeout when in a error backoff state. + MaxBackoff time.Duration + // InitialRetry is the initial retry interval when an error is reported. + InitialRetry time.Duration + + // errs is the count of consecutive errors that have occurred. + errs int + // current is the current backoff timeout. + current time.Duration +} + +// Update the timeout with the current error state. +func (b *Timeout) Update(ok bool) { + if ok { + b.errs = 0 + b.current = b.Resync + return + } + + b.errs++ + if b.errs == 1 { + b.current = b.InitialRetry + return + } + + b.current *= 2 + if b.current >= b.MaxBackoff { + b.current = b.MaxBackoff + } +} + +// Next returns the next operation timeout given the disposition of err. +func (b *Timeout) Next() time.Duration { + if b.errs == 0 { + return b.Resync + } + return b.current +} diff --git a/pkg/controller/node/ipam/timeout_test.go b/pkg/controller/node/ipam/timeout_test.go new file mode 100644 index 00000000000..0da314e6aee --- /dev/null +++ b/pkg/controller/node/ipam/timeout_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "errors" + "testing" + "time" +) + +func TestTimeout(t *testing.T) { + time10s := 10 * time.Second + time5s := 5 * time.Second + timeout := &Timeout{ + Resync: time10s, + MaxBackoff: time5s, + InitialRetry: time.Second, + } + + for _, testStep := range []struct { + err error + want time.Duration + }{ + {nil, time10s}, + {nil, time10s}, + {errors.New("x"), time.Second}, + {errors.New("x"), 2 * time.Second}, + {errors.New("x"), 4 * time.Second}, + {errors.New("x"), 5 * time.Second}, + {errors.New("x"), 5 * time.Second}, + {nil, time10s}, + {nil, time10s}, + {errors.New("x"), time.Second}, + {errors.New("x"), 2 * time.Second}, + {nil, time10s}, + } { + timeout.Update(testStep.err == nil) + next := timeout.Next() + if next != testStep.want { + t.Errorf("timeout.next(%v) = %v, want %v", testStep.err, next, testStep.want) + } + } +} diff --git a/pkg/controller/node/metrics.go b/pkg/controller/node/metrics.go index e3b619b7447..31bba5b2332 100644 --- a/pkg/controller/node/metrics.go +++ b/pkg/controller/node/metrics.go @@ -23,42 +23,42 @@ import ( ) const ( - NodeControllerSubsystem = "node_collector" - ZoneHealthStatisticKey = "zone_health" - ZoneSizeKey = "zone_size" - ZoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone" - EvictionsNumberKey = "evictions_number" + nodeControllerSubsystem = "node_collector" + zoneHealthStatisticKey = "zone_health" + zoneSizeKey = "zone_size" + zoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone" + evictionsNumberKey = "evictions_number" ) var ( - ZoneHealth = prometheus.NewGaugeVec( + zoneHealth = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Subsystem: NodeControllerSubsystem, - Name: ZoneHealthStatisticKey, + Subsystem: nodeControllerSubsystem, + Name: zoneHealthStatisticKey, Help: "Gauge measuring percentage of healthy nodes per zone.", }, []string{"zone"}, ) - ZoneSize = prometheus.NewGaugeVec( + zoneSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Subsystem: NodeControllerSubsystem, - Name: ZoneSizeKey, + Subsystem: nodeControllerSubsystem, + Name: zoneSizeKey, Help: "Gauge measuring number of registered Nodes per zones.", }, []string{"zone"}, ) - UnhealthyNodes = prometheus.NewGaugeVec( + unhealthyNodes = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Subsystem: NodeControllerSubsystem, - Name: ZoneNoUnhealthyNodesKey, + Subsystem: nodeControllerSubsystem, + Name: zoneNoUnhealthyNodesKey, Help: "Gauge measuring number of not Ready Nodes per zones.", }, []string{"zone"}, ) - EvictionsNumber = prometheus.NewCounterVec( + evictionsNumber = prometheus.NewCounterVec( prometheus.CounterOpts{ - Subsystem: NodeControllerSubsystem, - Name: EvictionsNumberKey, + Subsystem: nodeControllerSubsystem, + Name: evictionsNumberKey, Help: "Number of Node evictions that happened since current instance of NodeController started.", }, []string{"zone"}, @@ -67,11 +67,12 @@ var ( var registerMetrics sync.Once +// Register the metrics that are to be monitored. func Register() { registerMetrics.Do(func() { - prometheus.MustRegister(ZoneHealth) - prometheus.MustRegister(ZoneSize) - prometheus.MustRegister(UnhealthyNodes) - prometheus.MustRegister(EvictionsNumber) + prometheus.MustRegister(zoneHealth) + prometheus.MustRegister(zoneSize) + prometheus.MustRegister(unhealthyNodes) + prometheus.MustRegister(evictionsNumber) }) } diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index b29103042e4..d04799b3a7e 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -27,7 +27,6 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -49,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/node/ipam" + nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync" "k8s.io/kubernetes/pkg/controller/node/scheduler" "k8s.io/kubernetes/pkg/controller/node/util" "k8s.io/kubernetes/pkg/util/metrics" @@ -66,12 +66,13 @@ func init() { var ( gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0") - + // UnreachableTaintTemplate is the taint for when a node becomes unreachable. UnreachableTaintTemplate = &v1.Taint{ Key: algorithm.TaintNodeUnreachable, Effect: v1.TaintEffectNoExecute, } - + // NotReadyTaintTemplate is the taint for when a node is not ready for + // executing pods NotReadyTaintTemplate = &v1.Taint{ Key: algorithm.TaintNodeNotReady, Effect: v1.TaintEffectNoExecute, @@ -97,15 +98,26 @@ const ( apiserverStartupGracePeriod = 10 * time.Minute // The amount of time the nodecontroller should sleep between retrying NodeStatus updates retrySleepTime = 20 * time.Millisecond + + // ipamResyncInterval is the amount of time between when the cloud and node + // CIDR range assignments are synchronized. + ipamResyncInterval = 30 * time.Second + // ipamMaxBackoff is the maximum backoff for retrying synchronization of a + // given in the error state. + ipamMaxBackoff = 10 * time.Second + // ipamInitialRetry is the initial retry interval for retrying synchronization of a + // given in the error state. + ipamInitialBackoff = 250 * time.Millisecond ) -type zoneState string +// ZoneState is the state of a given zone. +type ZoneState string const ( - stateInitial = zoneState("Initial") - stateNormal = zoneState("Normal") - stateFullDisruption = zoneState("FullDisruption") - statePartialDisruption = zoneState("PartialDisruption") + stateInitial = ZoneState("Initial") + stateNormal = ZoneState("Normal") + stateFullDisruption = ZoneState("FullDisruption") + statePartialDisruption = ZoneState("PartialDisruption") ) type nodeStatusData struct { @@ -114,7 +126,8 @@ type nodeStatusData struct { status v1.NodeStatus } -type NodeController struct { +// Controller is the controller that manages node related cluster state. +type Controller struct { allocateNodeCIDRs bool allocatorType ipam.CIDRAllocatorType @@ -125,10 +138,10 @@ type NodeController struct { kubeClient clientset.Interface // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) - // Value used if sync_nodes_status=False. NodeController will not proactively + // Value used if sync_nodes_status=False. Controller will not proactively // sync node status in this case, but will monitor node status updated from kubelet. If // it doesn't receive update for this amount of time, it will start posting "NodeReady== - // ConditionUnknown". The amount of time before which NodeController start evicting pods + // ConditionUnknown". The amount of time before which Controller start evicting pods // is controlled via flag 'pod-eviction-timeout'. // Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency // in kubelet. There are several constraints: @@ -140,7 +153,7 @@ type NodeController struct { // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // longer for user to see up-to-date node status. nodeMonitorGracePeriod time.Duration - // Value controlling NodeController monitoring period, i.e. how often does NodeController + // Value controlling Controller monitoring period, i.e. how often does Controller // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. // TODO: Change node status monitor to watch based. nodeMonitorPeriod time.Duration @@ -170,28 +183,26 @@ type NodeController struct { daemonSetInformerSynced cache.InformerSynced podInformerSynced cache.InformerSynced - - cidrAllocator ipam.CIDRAllocator - - taintManager *scheduler.NoExecuteTaintManager + cidrAllocator ipam.CIDRAllocator + taintManager *scheduler.NoExecuteTaintManager forcefullyDeletePod func(*v1.Pod) error nodeExistsInCloudProvider func(types.NodeName) (bool, error) - computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, zoneState) + computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState) enterPartialDisruptionFunc func(nodeNum int) float32 enterFullDisruptionFunc func(nodeNum int) float32 - zoneStates map[string]zoneState + zoneStates map[string]ZoneState evictionLimiterQPS float32 secondaryEvictionLimiterQPS float32 largeClusterThreshold int32 unhealthyZoneThreshold float32 - // if set to true NodeController will start TaintManager that will evict Pods from + // if set to true Controller will start TaintManager that will evict Pods from // tainted nodes, if they're not tolerated. runTaintManager bool - // if set to true NodeController will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable' + // if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable' // taints instead of evicting Pods itself. useTaintBasedEvictions bool @@ -225,17 +236,21 @@ func NewNodeController( allocatorType ipam.CIDRAllocatorType, runTaintManager bool, useTaintBasedEvictions bool, - taintNodeByCondition bool, -) (*NodeController, error) { + taintNodeByCondition bool) (*Controller, error) { + + if kubeClient == nil { + glog.Fatalf("kubeClient is nil when starting Controller") + } + eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"}) eventBroadcaster.StartLogging(glog.Infof) - if kubeClient != nil { - glog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) - } else { - glog.Fatalf("kubeClient is nil when starting NodeController") - } + + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink( + &v1core.EventSinkImpl{ + Interface: v1core.New(kubeClient.Core().RESTClient()).Events(""), + }) if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().RESTClient().GetRateLimiter()) @@ -243,45 +258,49 @@ func NewNodeController( if allocateNodeCIDRs { if clusterCIDR == nil { - glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") + glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.") } mask := clusterCIDR.Mask if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize { - glog.Fatal("NodeController: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.") + glog.Fatal("Controller: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.") } } - nc := &NodeController{ - cloud: cloud, - knownNodeSet: make(map[string]*v1.Node), - kubeClient: kubeClient, - recorder: recorder, - podEvictionTimeout: podEvictionTimeout, - maximumGracePeriod: 5 * time.Minute, - zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), - zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue), - nodeStatusMap: make(map[string]nodeStatusData), - nodeMonitorGracePeriod: nodeMonitorGracePeriod, - nodeMonitorPeriod: nodeMonitorPeriod, - nodeStartupGracePeriod: nodeStartupGracePeriod, - lookupIP: net.LookupIP, - now: metav1.Now, - clusterCIDR: clusterCIDR, - serviceCIDR: serviceCIDR, - allocateNodeCIDRs: allocateNodeCIDRs, - allocatorType: allocatorType, - forcefullyDeletePod: func(p *v1.Pod) error { return util.ForcefullyDeletePod(kubeClient, p) }, - nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return util.NodeExistsInCloudProvider(cloud, nodeName) }, + nc := &Controller{ + cloud: cloud, + knownNodeSet: make(map[string]*v1.Node), + kubeClient: kubeClient, + recorder: recorder, + podEvictionTimeout: podEvictionTimeout, + maximumGracePeriod: 5 * time.Minute, + zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), + zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue), + nodeStatusMap: make(map[string]nodeStatusData), + nodeMonitorGracePeriod: nodeMonitorGracePeriod, + nodeMonitorPeriod: nodeMonitorPeriod, + nodeStartupGracePeriod: nodeStartupGracePeriod, + lookupIP: net.LookupIP, + now: metav1.Now, + clusterCIDR: clusterCIDR, + serviceCIDR: serviceCIDR, + allocateNodeCIDRs: allocateNodeCIDRs, + allocatorType: allocatorType, + forcefullyDeletePod: func(p *v1.Pod) error { + return util.ForcefullyDeletePod(kubeClient, p) + }, + nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { + return util.NodeExistsInCloudProvider(cloud, nodeName) + }, evictionLimiterQPS: evictionLimiterQPS, secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS, largeClusterThreshold: largeClusterThreshold, unhealthyZoneThreshold: unhealthyZoneThreshold, - zoneStates: make(map[string]zoneState), + zoneStates: make(map[string]ZoneState), runTaintManager: runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, } if useTaintBasedEvictions { - glog.Infof("NodeController is using taint based evictions.") + glog.Infof("Controller is using taint based evictions.") } nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc nc.enterFullDisruptionFunc = nc.HealthyQPSFunc @@ -326,67 +345,34 @@ func NewNodeController( nc.podInformerSynced = podInformer.Informer().HasSynced if nc.allocateNodeCIDRs { - var nodeList *v1.NodeList - var err error - // We must poll because apiserver might not be up. This error causes - // controller manager to restart. - if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) { - nodeList, err = kubeClient.Core().Nodes().List(metav1.ListOptions{ - FieldSelector: fields.Everything().String(), - LabelSelector: labels.Everything().String(), - }) - if err != nil { - glog.Errorf("Failed to list all nodes: %v", err) - return false, nil + if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType { + cfg := &ipam.Config{ + Resync: ipamResyncInterval, + MaxBackoff: ipamMaxBackoff, + InitialRetry: ipamInitialBackoff, } - return true, nil - }); pollErr != nil { - return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod) + switch nc.allocatorType { + case ipam.IPAMFromClusterAllocatorType: + cfg.Mode = nodesync.SyncFromCluster + case ipam.IPAMFromCloudAllocatorType: + cfg.Mode = nodesync.SyncFromCloud + } + ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) + if err != nil { + glog.Fatalf("Error creating ipam controller: %v", err) + } + if err := ipamc.Start(nodeInformer); err != nil { + glog.Fatalf("Error trying to Init(): %v", err) + } + } else { + var err error + nc.cidrAllocator, err = ipam.New( + kubeClient, cloud, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize) + if err != nil { + return nil, err + } + nc.cidrAllocator.Register(nodeInformer) } - - switch nc.allocatorType { - case ipam.RangeAllocatorType: - nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator( - kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList) - case ipam.CloudAllocatorType: - nc.cidrAllocator, err = ipam.NewCloudCIDRAllocator(kubeClient, cloud) - default: - return nil, fmt.Errorf("Invalid CIDR allocator type: %v", nc.allocatorType) - } - - if err != nil { - return nil, err - } - - nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - // If the PodCIDR is not empty we either: - // - already processed a Node that already had a CIDR after NC restarted - // (cidr is marked as used), - // - already processed a Node successfully and allocated a CIDR for it - // (cidr is marked as used), - // - already processed a Node but we did saw a "timeout" response and - // request eventually got through in this case we haven't released - // the allocated CIDR (cidr is still marked as used). - // There's a possible error here: - // - NC sees a new Node and assigns a CIDR X to it, - // - Update Node call fails with a timeout, - // - Node is updated by some other component, NC sees an update and - // assigns CIDR Y to the Node, - // - Both CIDR X and CIDR Y are marked as used in the local cache, - // even though Node sees only CIDR Y - // The problem here is that in in-memory cache we see CIDR X as marked, - // which prevents it from being assigned to any new node. The cluster - // state is correct. - // Restart of NC fixes the issue. - if newNode.Spec.PodCIDR == "" { - return nc.cidrAllocator.AllocateOrOccupyCIDR(newNode) - } - return nil - }), - DeleteFunc: util.CreateDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR), - }) } if nc.runTaintManager { @@ -427,7 +413,7 @@ func NewNodeController( return nc, nil } -func (nc *NodeController) doEvictionPass() { +func (nc *Controller) doEvictionPass() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() for k := range nc.zonePodEvictor { @@ -440,23 +426,23 @@ func (nc *NodeController) doEvictionPass() { glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) } else { zone := utilnode.GetZoneKey(node) - EvictionsNumber.WithLabelValues(zone).Inc() + evictionsNumber.WithLabelValues(zone).Inc() } - nodeUid, _ := value.UID.(string) - remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore) + nodeUID, _ := value.UID.(string) + remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } if remaining { - glog.Infof("Pods awaiting deletion due to NodeController eviction") + glog.Infof("Pods awaiting deletion due to Controller eviction") } return true, 0 }) } } -func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error { +func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { // Map node's condition to Taints. taints := []v1.Taint{} for _, condition := range node.Status.Conditions { @@ -484,7 +470,7 @@ func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error { return nil } -func (nc *NodeController) doNoExecuteTaintingPass() { +func (nc *Controller) doNoExecuteTaintingPass() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() for k := range nc.zoneNoExecuteTainer { @@ -500,7 +486,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() { return false, 50 * time.Millisecond } else { zone := utilnode.GetZoneKey(node) - EvictionsNumber.WithLabelValues(zone).Inc() + evictionsNumber.WithLabelValues(zone).Inc() } _, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady) // Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive. @@ -524,7 +510,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() { } // Run starts an asynchronous loop that monitors the status of cluster nodes. -func (nc *NodeController) Run(stopCh <-chan struct{}) { +func (nc *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting node controller") @@ -560,7 +546,7 @@ func (nc *NodeController) Run(stopCh <-chan struct{}) { } // addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor. -func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) { +func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) { zone := utilnode.GetZoneKey(node) if _, found := nc.zoneStates[zone]; !found { nc.zoneStates[zone] = stateInitial @@ -575,14 +561,14 @@ func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) { } // Init the metric for the new zone. glog.Infof("Initializing eviction metric for zone: %v", zone) - EvictionsNumber.WithLabelValues(zone).Add(0) + evictionsNumber.WithLabelValues(zone).Add(0) } } // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, // post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or // not reachable for a long period of time. -func (nc *NodeController) monitorNodeStatus() error { +func (nc *Controller) monitorNodeStatus() error { // We are listing nodes from local cache as we can tolerate some small delays // comparing to state from etcd and there is eventual consistency anyway. nodes, err := nc.nodeLister.List(labels.Everything()) @@ -596,8 +582,8 @@ func (nc *NodeController) monitorNodeStatus() error { } for i := range added { - glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name) - util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) + glog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name) + util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name)) nc.knownNodeSet[added[i].Name] = added[i] nc.addPodEvictorForNewZone(added[i]) if nc.useTaintBasedEvictions { @@ -608,8 +594,8 @@ func (nc *NodeController) monitorNodeStatus() error { } for i := range deleted { - glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name) - util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name)) + glog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name) + util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name)) delete(nc.knownNodeSet, deleted[i].Name) } @@ -632,7 +618,7 @@ func (nc *NodeController) monitorNodeStatus() error { } return false, nil }); err != nil { - glog.Errorf("Update status of Node %v from NodeController error : %v. "+ + glog.Errorf("Update status of Node %v from Controller error : %v. "+ "Skipping - no pods will be evicted.", node.Name, err) continue } @@ -752,14 +738,14 @@ func (nc *NodeController) monitorNodeStatus() error { return nil } -func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) { - newZoneStates := map[string]zoneState{} +func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) { + newZoneStates := map[string]ZoneState{} allAreFullyDisrupted := true for k, v := range zoneToNodeConditions { - ZoneSize.WithLabelValues(k).Set(float64(len(v))) + zoneSize.WithLabelValues(k).Set(float64(len(v))) unhealthy, newState := nc.computeZoneStateFunc(v) - ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v))) - UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy)) + zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v))) + unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy)) if newState != stateFullDisruption { allAreFullyDisrupted = false } @@ -773,9 +759,9 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1 allWasFullyDisrupted := true for k, v := range nc.zoneStates { if _, have := zoneToNodeConditions[k]; !have { - ZoneSize.WithLabelValues(k).Set(0) - ZoneHealth.WithLabelValues(k).Set(100) - UnhealthyNodes.WithLabelValues(k).Set(0) + zoneSize.WithLabelValues(k).Set(0) + zoneHealth.WithLabelValues(k).Set(100) + unhealthyNodes.WithLabelValues(k).Set(0) delete(nc.zoneStates, k) continue } @@ -793,7 +779,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1 if !allAreFullyDisrupted || !allWasFullyDisrupted { // We're switching to full disruption mode if allAreFullyDisrupted { - glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.") + glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.") for i := range nodes { if nc.useTaintBasedEvictions { _, err := nc.markNodeAsReachable(nodes[i]) @@ -820,7 +806,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1 } // We're exiting full disruption mode if allWasFullyDisrupted { - glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.") + glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.") // When exiting disruption mode update probe timestamps on all Nodes. now := nc.now() for i := range nodes { @@ -843,14 +829,14 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1 if v == newState { continue } - glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState) + glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState) nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState) nc.zoneStates[k] = newState } } } -func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) { +func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) { switch state { case stateNormal: if nc.useTaintBasedEvictions { @@ -879,7 +865,7 @@ func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zone // tryUpdateNodeStatus checks a given node's conditions and tries to update it. Returns grace period to // which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred. -func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) { +func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) { var err error var gracePeriod time.Duration var observedReadyCondition v1.NodeCondition @@ -909,7 +895,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1. savedNodeStatus, found := nc.nodeStatusMap[node.Name] // There are following cases to check: // - both saved and new status have no Ready Condition set - we leave everything as it is, - // - saved status have no Ready Condition, but current one does - NodeController was restarted with Node data already present in etcd, + // - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd, // - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do, // - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be // unresponsive, so we leave it as it is, @@ -1036,14 +1022,13 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1. if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil { glog.Errorf("Error updating node %s: %v", node.Name, err) return gracePeriod, observedReadyCondition, currentReadyCondition, err - } else { - nc.nodeStatusMap[node.Name] = nodeStatusData{ - status: node.Status, - probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp, - readyTransitionTimestamp: nc.now(), - } - return gracePeriod, observedReadyCondition, currentReadyCondition, nil } + nc.nodeStatusMap[node.Name] = nodeStatusData{ + status: node.Status, + probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp, + readyTransitionTimestamp: nc.now(), + } + return gracePeriod, observedReadyCondition, currentReadyCondition, nil } } @@ -1054,7 +1039,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1. // 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet' // 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes' // 3. newZoneRepresentatives: the nodes that in both 'knownNodeSet' and 'allNodes', but no zone states -func (nc *NodeController) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) { +func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) { for i := range allNodes { if _, has := nc.knownNodeSet[allNodes[i].Name]; !has { added = append(added, allNodes[i]) @@ -1086,7 +1071,7 @@ func (nc *NodeController) classifyNodes(allNodes []*v1.Node) (added, deleted, ne // cancelPodEviction removes any queued evictions, typically because the node is available again. It // returns true if an eviction was queued. -func (nc *NodeController) cancelPodEviction(node *v1.Node) bool { +func (nc *Controller) cancelPodEviction(node *v1.Node) bool { zone := utilnode.GetZoneKey(node) nc.evictorLock.Lock() defer nc.evictorLock.Unlock() @@ -1100,19 +1085,19 @@ func (nc *NodeController) cancelPodEviction(node *v1.Node) bool { // evictPods queues an eviction for the provided node name, and returns false if the node is already // queued for eviction. -func (nc *NodeController) evictPods(node *v1.Node) bool { +func (nc *Controller) evictPods(node *v1.Node) bool { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) } -func (nc *NodeController) markNodeForTainting(node *v1.Node) bool { +func (nc *Controller) markNodeForTainting(node *v1.Node) bool { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) } -func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) { +func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, UnreachableTaintTemplate) @@ -1128,13 +1113,15 @@ func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) { return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil } -// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc. -func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 { +// HealthyQPSFunc returns the default value for cluster eviction rate - we take +// nodeNum for consistency with ReducedQPSFunc. +func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 { return nc.evictionLimiterQPS } -// If the cluster is large make evictions slower, if they're small stop evictions altogether. -func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 { +// ReducedQPSFunc returns the QPS for when a the cluster is large make +// evictions slower, if they're small stop evictions altogether. +func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 { if int32(nodeNum) > nc.largeClusterThreshold { return nc.secondaryEvictionLimiterQPS } @@ -1146,7 +1133,7 @@ func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 { // - fullyDisrupted if there're no Ready Nodes, // - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready, // - normal otherwise -func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, zoneState) { +func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) { readyNodes := 0 notReadyNodes := 0 for i := range nodeReadyConditions { @@ -1168,7 +1155,7 @@ func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeConditi // maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating // that should not be gracefully terminated. -func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { +func (nc *Controller) maybeDeleteTerminatingPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 6b190dbbe8c..13e738b7521 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -61,12 +61,12 @@ const ( func alwaysReady() bool { return true } type nodeController struct { - *NodeController + *Controller nodeInformer coreinformers.NodeInformer daemonSetInformer extensionsinformers.DaemonSetInformer } -func NewNodeControllerFromClient( +func newNodeControllerFromClient( cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, @@ -597,7 +597,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController, _ := NewNodeControllerFromClient( + nodeController, _ := newNodeControllerFromClient( nil, item.fakeNodeHandler, evictionTimeout, @@ -643,8 +643,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, zone := range zones { if _, ok := nodeController.zonePodEvictor[zone]; ok { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { - nodeUid, _ := value.UID.(string) - util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister()) + nodeUID, _ := value.UID.(string) + util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) return true, 0 }) } else { @@ -763,7 +763,7 @@ func TestPodStatusChange(t *testing.T) { } for _, item := range table { - nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, + nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) nodeController.now = func() metav1.Time { return fakeNow } @@ -788,8 +788,8 @@ func TestPodStatusChange(t *testing.T) { zones := testutil.GetZones(item.fakeNodeHandler) for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { - nodeUid, _ := value.UID.(string) - util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) + nodeUID, _ := value.UID.(string) + util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) return true, 0 }) } @@ -846,8 +846,8 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { nodeList []*v1.Node podList []v1.Pod updatedNodeStatuses []v1.NodeStatus - expectedInitialStates map[string]zoneState - expectedFollowingStates map[string]zoneState + expectedInitialStates map[string]ZoneState + expectedFollowingStates map[string]ZoneState expectedEvictPods bool description string }{ @@ -901,8 +901,8 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { unhealthyNodeNewStatus, unhealthyNodeNewStatus, }, - expectedInitialStates: map[string]zoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption}, - expectedFollowingStates: map[string]zoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption}, + expectedInitialStates: map[string]ZoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption}, + expectedFollowingStates: map[string]ZoneState{testutil.CreateZoneID("region1", "zone1"): stateFullDisruption}, expectedEvictPods: false, description: "Network Disruption: Only zone is down - eviction shouldn't take place.", }, @@ -957,11 +957,11 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { unhealthyNodeNewStatus, unhealthyNodeNewStatus, }, - expectedInitialStates: map[string]zoneState{ + expectedInitialStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, testutil.CreateZoneID("region2", "zone2"): stateFullDisruption, }, - expectedFollowingStates: map[string]zoneState{ + expectedFollowingStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, testutil.CreateZoneID("region2", "zone2"): stateFullDisruption, }, @@ -1018,11 +1018,11 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { unhealthyNodeNewStatus, healthyNodeNewStatus, }, - expectedInitialStates: map[string]zoneState{ + expectedInitialStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, testutil.CreateZoneID("region1", "zone2"): stateNormal, }, - expectedFollowingStates: map[string]zoneState{ + expectedFollowingStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, testutil.CreateZoneID("region1", "zone2"): stateNormal, }, @@ -1079,10 +1079,10 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { unhealthyNodeNewStatus, healthyNodeNewStatus, }, - expectedInitialStates: map[string]zoneState{ + expectedInitialStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, }, - expectedFollowingStates: map[string]zoneState{ + expectedFollowingStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, }, expectedEvictPods: false, @@ -1139,11 +1139,11 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { unhealthyNodeNewStatus, healthyNodeNewStatus, }, - expectedInitialStates: map[string]zoneState{ + expectedInitialStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, testutil.CreateZoneID("region1", "zone2"): stateFullDisruption, }, - expectedFollowingStates: map[string]zoneState{ + expectedFollowingStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): stateFullDisruption, testutil.CreateZoneID("region1", "zone2"): stateNormal, }, @@ -1264,10 +1264,10 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { healthyNodeNewStatus, healthyNodeNewStatus, }, - expectedInitialStates: map[string]zoneState{ + expectedInitialStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): statePartialDisruption, }, - expectedFollowingStates: map[string]zoneState{ + expectedFollowingStates: map[string]ZoneState{ testutil.CreateZoneID("region1", "zone1"): statePartialDisruption, }, expectedEvictPods: true, @@ -1280,7 +1280,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { Existing: item.nodeList, Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}), } - nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, + nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) nodeController.now = func() metav1.Time { return fakeNow } @@ -1384,7 +1384,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}), DeleteWaitChan: make(chan struct{}), } - nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute, + nodeController, _ := newNodeControllerFromClient(nil, fnh, 10*time.Minute, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) @@ -1654,7 +1654,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, + nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) nodeController.now = func() metav1.Time { return fakeNow } @@ -1888,7 +1888,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, + nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) nodeController.now = func() metav1.Time { return fakeNow } @@ -1999,7 +1999,7 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) { originalTaint := UnreachableTaintTemplate updatedTaint := NotReadyTaintTemplate - nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, + nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) nodeController.now = func() metav1.Time { return fakeNow } @@ -2092,7 +2092,7 @@ func TestTaintsNodeByCondition(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, + nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) nodeController.now = func() metav1.Time { return fakeNow } @@ -2270,7 +2270,7 @@ func TestNodeEventGeneration(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, + nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) @@ -2384,7 +2384,7 @@ func TestCheckPod(t *testing.T) { }, } - nc, _ := NewNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false, false) + nc, _ := newNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false, false) nc.nodeInformer.Informer().GetStore().Add(&v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "new",