Merge pull request #124519 from dims/drop-all-the-providery-things-take-2

Remove gcp in-tree cloud provider and credential providers
This commit is contained in:
Kubernetes Prow Robot
2024-05-06 08:03:14 -07:00
committed by GitHub
387 changed files with 27 additions and 1071842 deletions

View File

@@ -1,147 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"context"
"encoding/json"
"fmt"
"net"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/legacy-cloud-providers/gce"
"k8s.io/metrics/pkg/client/clientset/versioned/scheme"
)
type adapter struct {
k8s clientset.Interface
cloud *gce.Cloud
broadcaster record.EventBroadcaster
recorder record.EventRecorder
}
func newAdapter(ctx context.Context, k8s clientset.Interface, cloud *gce.Cloud) *adapter {
broadcaster := record.NewBroadcaster(record.WithContext(ctx))
ret := &adapter{
k8s: k8s,
cloud: cloud,
broadcaster: broadcaster,
recorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"}),
}
return ret
}
func (a *adapter) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
a.broadcaster.StartStructuredLogging(3)
a.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: a.k8s.CoreV1().Events("")})
defer a.broadcaster.Shutdown()
<-ctx.Done()
}
func (a *adapter) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) {
if node.Spec.ProviderID == "" {
return nil, fmt.Errorf("node %s doesn't have providerID", node.Name)
}
cidrs, err := a.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
if err != nil {
return nil, err
}
switch len(cidrs) {
case 0:
return nil, nil
case 1:
break
default:
klog.FromContext(ctx).Info("Node has more than one alias assigned, defaulting to the first", "node", klog.KObj(node), "CIDRs", cidrs)
}
_, cidrRange, err := netutils.ParseCIDRSloppy(cidrs[0])
if err != nil {
return nil, err
}
return cidrRange, nil
}
func (a *adapter) AddAlias(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
if node.Spec.ProviderID == "" {
return fmt.Errorf("node %s doesn't have providerID", node.Name)
}
return a.cloud.AddAliasToInstanceByProviderID(node.Spec.ProviderID, cidrRange)
}
func (a *adapter) Node(ctx context.Context, name string) (*v1.Node, error) {
return a.k8s.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
}
func (a *adapter) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
patch := map[string]interface{}{
"apiVersion": node.APIVersion,
"kind": node.Kind,
"metadata": map[string]interface{}{"name": node.Name},
"spec": map[string]interface{}{"podCIDR": cidrRange.String()},
}
bytes, err := json.Marshal(patch)
if err != nil {
return err
}
_, err = a.k8s.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
return err
}
func (a *adapter) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error {
condition := v1.ConditionFalse
if unavailable {
condition = v1.ConditionTrue
}
return nodeutil.SetNodeCondition(a.k8s, types.NodeName(nodeName), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: condition,
Reason: "RouteCreated",
Message: "NodeController created an implicit route",
LastTransitionTime: metav1.Now(),
})
}
func (a *adapter) EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{}) {
ref := &v1.ObjectReference{Kind: "Node", Name: nodeName}
a.recorder.Eventf(ref, v1.EventTypeNormal, reason, fmt, args...)
}

View File

@@ -61,20 +61,8 @@ const (
// The no. of NodeSpec updates NC can process concurrently.
cidrUpdateWorkers = 30
// The max no. of NodeSpec updates that can be enqueued.
cidrUpdateQueueSize = 5000
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
cidrUpdateRetries = 3
// updateRetryTimeout is the time to wait before requeing a failed node for retry
updateRetryTimeout = 250 * time.Millisecond
// maxUpdateRetryTimeout is the maximum amount of time between timeouts.
maxUpdateRetryTimeout = 5 * time.Second
// updateMaxRetries is the max retries for a failed node
updateMaxRetries = 10
)
// nodePollInterval is used in listing node
@@ -116,8 +104,6 @@ func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovide
switch allocatorType {
case RangeAllocatorType:
return NewCIDRRangeAllocator(ctx, kubeClient, nodeInformer, allocatorParams, nodeList)
case CloudAllocatorType:
return NewCloudCIDRAllocator(ctx, kubeClient, cloud, nodeInformer)
default:
return nil, fmt.Errorf("invalid CIDR allocator type: %v", allocatorType)
}

View File

@@ -1,378 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"context"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
informers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
cloudprovider "k8s.io/cloud-provider"
nodeutil "k8s.io/component-helpers/node/util"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
utiltaints "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/legacy-cloud-providers/gce"
netutils "k8s.io/utils/net"
)
// nodeProcessingInfo tracks information related to current nodes in processing
type nodeProcessingInfo struct {
retries int
}
// cloudCIDRAllocator allocates node CIDRs according to IP address aliases
// assigned by the cloud provider. In this case, the allocation and
// deallocation is delegated to the external provider, and the controller
// merely takes the assignment and updates the node spec.
type cloudCIDRAllocator struct {
client clientset.Interface
cloud *gce.Cloud
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
// NewCloudCIDRAllocator.
nodeLister corelisters.NodeLister
// nodesSynced returns true if the node shared informer has been synced at least once.
nodesSynced cache.InformerSynced
// Channel that is used to pass updating Nodes to the background.
// This increases the throughput of CIDR assignment by parallelization
// and not blocking on long operations (which shouldn't be done from
// event handlers anyway).
nodeUpdateChannel chan string
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
lock sync.Mutex
nodesInProcessing map[string]*nodeProcessingInfo
}
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator(ctx context.Context, client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
logger := klog.FromContext(ctx)
if client == nil {
logger.Error(nil, "kubeClient is nil when starting cloud CIDR allocator")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
gceCloud, ok := cloud.(*gce.Cloud)
if !ok {
err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
return nil, err
}
ca := &cloudCIDRAllocator{
client: client,
cloud: gceCloud,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
broadcaster: eventBroadcaster,
recorder: recorder,
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(
func(node *v1.Node) error {
return ca.AllocateOrOccupyCIDR(ctx, node)
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(ctx, newNode)
}
// Even if PodCIDR is assigned, but NetworkUnavailable condition is
// set to true, we need to process the node to set the condition.
networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule}
_, cond := controllerutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) {
return ca.AllocateOrOccupyCIDR(ctx, newNode)
}
return nil
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
return ca.ReleaseCIDR(logger, node)
}),
})
logger.Info("Using cloud CIDR allocator", "provider", cloud.ProviderName())
return ca, nil
}
func (ca *cloudCIDRAllocator) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
ca.broadcaster.StartStructuredLogging(3)
logger := klog.FromContext(ctx)
logger.Info("Sending events to api server")
ca.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ca.client.CoreV1().Events("")})
defer ca.broadcaster.Shutdown()
logger.Info("Starting cloud CIDR allocator")
defer logger.Info("Shutting down cloud CIDR allocator")
if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), ca.nodesSynced) {
return
}
for i := 0; i < cidrUpdateWorkers; i++ {
go ca.worker(ctx)
}
<-ctx.Done()
}
func (ca *cloudCIDRAllocator) worker(ctx context.Context) {
logger := klog.FromContext(ctx)
for {
select {
case workItem, ok := <-ca.nodeUpdateChannel:
if !ok {
logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
if err := ca.updateCIDRAllocation(ctx, workItem); err == nil {
logger.V(3).Info("Updated CIDR", "workItem", workItem)
} else {
logger.Error(err, "Error updating CIDR", "workItem", workItem)
if canRetry, timeout := ca.retryParams(logger, workItem); canRetry {
logger.V(2).Info("Retrying update on next period", "workItem", workItem, "timeout", timeout)
time.AfterFunc(timeout, func() {
// Requeue the failed node for update again.
ca.nodeUpdateChannel <- workItem
})
continue
}
logger.Error(nil, "Exceeded retry count, dropping from queue", "workItem", workItem)
}
ca.removeNodeFromProcessing(workItem)
case <-ctx.Done():
return
}
}
}
func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
if _, found := ca.nodesInProcessing[nodeName]; found {
return false
}
ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{}
return true
}
func (ca *cloudCIDRAllocator) retryParams(logger klog.Logger, nodeName string) (bool, time.Duration) {
ca.lock.Lock()
defer ca.lock.Unlock()
entry, ok := ca.nodesInProcessing[nodeName]
if !ok {
logger.Error(nil, "Cannot get retryParams for node as entry does not exist", "node", klog.KRef("", nodeName))
return false, 0
}
count := entry.retries + 1
if count > updateMaxRetries {
return false, 0
}
ca.nodesInProcessing[nodeName].retries = count
return true, nodeUpdateRetryTimeout(count)
}
func nodeUpdateRetryTimeout(count int) time.Duration {
timeout := updateRetryTimeout
for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
timeout *= 2
}
if timeout > maxUpdateRetryTimeout {
timeout = maxUpdateRetryTimeout
}
return time.Duration(timeout.Nanoseconds()/2 + rand.Int63n(timeout.Nanoseconds()))
}
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
ca.lock.Lock()
defer ca.lock.Unlock()
delete(ca.nodesInProcessing, nodeName)
}
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error {
if node == nil {
return nil
}
logger := klog.FromContext(ctx)
if !ca.insertNodeToProcessing(node.Name) {
logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
return nil
}
logger.V(4).Info("Putting node into the work queue", "node", klog.KObj(node))
ca.nodeUpdateChannel <- node.Name
return nil
}
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(ctx context.Context, nodeName string) error {
logger := klog.FromContext(ctx)
node, err := ca.nodeLister.Get(nodeName)
if err != nil {
if errors.IsNotFound(err) {
return nil // node no longer available, skip processing
}
logger.Error(err, "Failed while getting the node for updating Node.Spec.PodCIDR", "node", klog.KRef("", nodeName))
return err
}
if node.Spec.ProviderID == "" {
return fmt.Errorf("node %s doesn't have providerID", nodeName)
}
cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
if err != nil {
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to get cidr(s) from provider: %v", err)
}
if len(cidrStrings) == 0 {
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
}
//Can have at most 2 ips (one for v4 and one for v6)
if len(cidrStrings) > 2 {
logger.Info("Got more than 2 ips, truncating to 2", "cidrStrings", cidrStrings)
cidrStrings = cidrStrings[:2]
}
cidrs, err := netutils.ParseCIDRs(cidrStrings)
if err != nil {
return fmt.Errorf("failed to parse strings %v as CIDRs: %v", cidrStrings, err)
}
needUpdate, err := needPodCIDRsUpdate(logger, node, cidrs)
if err != nil {
return fmt.Errorf("err: %v, CIDRS: %v", err, cidrStrings)
}
if needUpdate {
if node.Spec.PodCIDR != "" {
logger.Error(nil, "PodCIDR being reassigned", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs, "cidrStrings", cidrStrings)
// We fall through and set the CIDR despite this error. This
// implements the same logic as implemented in the
// rangeAllocator.
//
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
}
for i := 0; i < cidrUpdateRetries; i++ {
if err = nodeutil.PatchNodeCIDRs(ctx, ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
logger.Info("Set the node PodCIDRs", "node", klog.KObj(node), "cidrStrings", cidrStrings)
break
}
}
}
if err != nil {
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRAssignmentFailed")
logger.Error(err, "Failed to update the node PodCIDR after multiple attempts", "node", klog.KObj(node), "cidrStrings", cidrStrings)
return err
}
err = nodeutil.SetNodeCondition(ca.client, types.NodeName(node.Name), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionFalse,
Reason: "RouteCreated",
Message: "NodeController create implicit route",
LastTransitionTime: metav1.Now(),
})
if err != nil {
logger.Error(err, "Error setting route status for the node", "node", klog.KObj(node))
}
return err
}
func needPodCIDRsUpdate(logger klog.Logger, node *v1.Node, podCIDRs []*net.IPNet) (bool, error) {
if node.Spec.PodCIDR == "" {
return true, nil
}
_, nodePodCIDR, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
if err != nil {
logger.Error(err, "Found invalid node.Spec.PodCIDR", "podCIDR", node.Spec.PodCIDR)
// We will try to overwrite with new CIDR(s)
return true, nil
}
nodePodCIDRs, err := netutils.ParseCIDRs(node.Spec.PodCIDRs)
if err != nil {
logger.Error(err, "Found invalid node.Spec.PodCIDRs", "podCIDRs", node.Spec.PodCIDRs)
// We will try to overwrite with new CIDR(s)
return true, nil
}
if len(podCIDRs) == 1 {
if cmp.Equal(nodePodCIDR, podCIDRs[0]) {
logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "podCIDR", podCIDRs[0])
return false, nil
}
} else if len(nodePodCIDRs) == len(podCIDRs) {
if dualStack, _ := netutils.IsDualStackCIDRs(podCIDRs); !dualStack {
return false, fmt.Errorf("IPs are not dual stack")
}
for idx, cidr := range podCIDRs {
if !cmp.Equal(nodePodCIDRs[idx], cidr) {
return true, nil
}
}
logger.V(4).Info("Node already has allocated CIDRs. It matches the proposed one", "node", klog.KObj(node), "podCIDRs", podCIDRs)
return false, nil
}
return true, nil
}
func (ca *cloudCIDRAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
logger.V(2).Info("Node's PodCIDR will be released by external cloud provider (not managed by controller)",
"node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
return nil
}

View File

@@ -1,34 +0,0 @@
//go:build providerless
// +build providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"context"
"errors"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator(ctx context.Context, client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
return nil, errors.New("legacy cloud provider support not built")
}

View File

@@ -1,193 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
_, found := ca.nodesInProcessing[name]
return found
}
func TestBoundedRetries(t *testing.T) {
clientSet := fake.NewSimpleClientset()
updateChan := make(chan string, 1) // need to buffer as we are using only on go routine
sharedInfomer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
ca := &cloudCIDRAllocator{
client: clientSet,
nodeUpdateChannel: updateChan,
nodeLister: sharedInfomer.Core().V1().Nodes().Lister(),
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
_, ctx := ktesting.NewTestContext(t)
go ca.worker(ctx)
nodeName := "testNode"
if err := ca.AllocateOrOccupyCIDR(ctx, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}); err != nil {
t.Errorf("unexpected error in AllocateOrOccupyCIDR: %v", err)
}
for hasNodeInProcessing(ca, nodeName) {
// wait for node to finish processing (should terminate and not time out)
}
}
func withinExpectedRange(got time.Duration, expected time.Duration) bool {
return got >= expected/2 && got <= 3*expected/2
}
func TestNodeUpdateRetryTimeout(t *testing.T) {
for _, tc := range []struct {
count int
want time.Duration
}{
{count: 0, want: 250 * time.Millisecond},
{count: 1, want: 500 * time.Millisecond},
{count: 2, want: 1000 * time.Millisecond},
{count: 3, want: 2000 * time.Millisecond},
{count: 50, want: 5000 * time.Millisecond},
} {
t.Run(fmt.Sprintf("count %d", tc.count), func(t *testing.T) {
if got := nodeUpdateRetryTimeout(tc.count); !withinExpectedRange(got, tc.want) {
t.Errorf("nodeUpdateRetryTimeout(tc.count) = %v; want %v", got, tc.want)
}
})
}
}
func TestNeedPodCIDRsUpdate(t *testing.T) {
for _, tc := range []struct {
desc string
cidrs []string
nodePodCIDR string
nodePodCIDRs []string
want bool
wantErr bool
}{
{
desc: "want error - invalid cidr",
cidrs: []string{"10.10.10.0/24"},
nodePodCIDR: "10.10..0/24",
nodePodCIDRs: []string{"10.10..0/24"},
want: true,
},
{
desc: "want error - cidr len 2 but not dual stack",
cidrs: []string{"10.10.10.0/24", "10.10.11.0/24"},
nodePodCIDR: "10.10.10.0/24",
nodePodCIDRs: []string{"10.10.10.0/24", "2001:db8::/64"},
wantErr: true,
},
{
desc: "want false - matching v4 only cidr",
cidrs: []string{"10.10.10.0/24"},
nodePodCIDR: "10.10.10.0/24",
nodePodCIDRs: []string{"10.10.10.0/24"},
want: false,
},
{
desc: "want false - nil node.Spec.PodCIDR",
cidrs: []string{"10.10.10.0/24"},
want: true,
},
{
desc: "want true - non matching v4 only cidr",
cidrs: []string{"10.10.10.0/24"},
nodePodCIDR: "10.10.11.0/24",
nodePodCIDRs: []string{"10.10.11.0/24"},
want: true,
},
{
desc: "want false - matching v4 and v6 cidrs",
cidrs: []string{"10.10.10.0/24", "2001:db8::/64"},
nodePodCIDR: "10.10.10.0/24",
nodePodCIDRs: []string{"10.10.10.0/24", "2001:db8::/64"},
want: false,
},
{
desc: "want false - matching v4 and v6 cidrs, different strings but same CIDRs",
cidrs: []string{"10.10.10.0/24", "2001:db8::/64"},
nodePodCIDR: "10.10.10.0/24",
nodePodCIDRs: []string{"10.10.10.0/24", "2001:db8:0::/64"},
want: false,
},
{
desc: "want true - matching v4 and non matching v6 cidrs",
cidrs: []string{"10.10.10.0/24", "2001:db8::/64"},
nodePodCIDR: "10.10.10.0/24",
nodePodCIDRs: []string{"10.10.10.0/24", "2001:dba::/64"},
want: true,
},
{
desc: "want true - nil node.Spec.PodCIDRs",
cidrs: []string{"10.10.10.0/24", "2001:db8::/64"},
want: true,
},
{
desc: "want true - matching v6 and non matching v4 cidrs",
cidrs: []string{"10.10.10.0/24", "2001:db8::/64"},
nodePodCIDR: "10.10.1.0/24",
nodePodCIDRs: []string{"10.10.1.0/24", "2001:db8::/64"},
want: true,
},
{
desc: "want true - missing v6",
cidrs: []string{"10.10.10.0/24", "2001:db8::/64"},
nodePodCIDR: "10.10.10.0/24",
nodePodCIDRs: []string{"10.10.10.0/24"},
want: true,
},
} {
var node v1.Node
node.Spec.PodCIDR = tc.nodePodCIDR
node.Spec.PodCIDRs = tc.nodePodCIDRs
netCIDRs, err := netutils.ParseCIDRs(tc.cidrs)
if err != nil {
t.Errorf("failed to parse %v as CIDRs: %v", tc.cidrs, err)
}
logger, _ := ktesting.NewTestContext(t)
t.Run(tc.desc, func(t *testing.T) {
got, err := needPodCIDRsUpdate(logger, &node, netCIDRs)
if tc.wantErr == (err == nil) {
t.Errorf("err: %v, wantErr: %v", err, tc.wantErr)
}
if err == nil && got != tc.want {
t.Errorf("got: %v, want: %v", got, tc.want)
}
})
}
}

View File

@@ -1,236 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"context"
"fmt"
"net"
"sync"
"time"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/legacy-cloud-providers/gce"
)
// Config for the IPAM controller.
type Config struct {
// Resync is the default timeout duration when there are no errors.
Resync time.Duration
// MaxBackoff is the maximum timeout when in a error backoff state.
MaxBackoff time.Duration
// InitialRetry is the initial retry interval when an error is reported.
InitialRetry time.Duration
// Mode to use to synchronize.
Mode nodesync.NodeSyncMode
}
// Controller is the controller for synchronizing cluster and cloud node
// pod CIDR range assignments.
type Controller struct {
config *Config
adapter *adapter
lock sync.Mutex
syncers map[string]*nodesync.NodeSync
set *cidrset.CidrSet
}
// NewController returns a new instance of the IPAM controller.
func NewController(
ctx context.Context,
config *Config,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
clusterCIDR, serviceCIDR *net.IPNet,
nodeCIDRMaskSize int) (*Controller, error) {
if !nodesync.IsValidMode(config.Mode) {
return nil, fmt.Errorf("invalid IPAM controller mode %q", config.Mode)
}
gceCloud, ok := cloud.(*gce.Cloud)
if !ok {
return nil, fmt.Errorf("cloud IPAM controller does not support %q provider", cloud.ProviderName())
}
set, err := cidrset.NewCIDRSet(clusterCIDR, nodeCIDRMaskSize)
if err != nil {
return nil, err
}
c := &Controller{
config: config,
adapter: newAdapter(ctx, kubeClient, gceCloud),
syncers: make(map[string]*nodesync.NodeSync),
set: set,
}
if err := occupyServiceCIDR(c.set, clusterCIDR, serviceCIDR); err != nil {
return nil, err
}
//check whether there is a remaining cidr after occupyServiceCIDR
cidr, err := c.set.AllocateNext()
switch err {
case cidrset.ErrCIDRRangeNoCIDRsRemaining:
return nil, fmt.Errorf("failed after occupy serviceCIDR: %v", err)
case nil:
err := c.set.Release(cidr)
return c, err
default:
return nil, fmt.Errorf("unexpected error when check remaining CIDR range: %v", err)
}
}
// Start initializes the Controller with the existing list of nodes and
// registers the informers for node changes. This will start synchronization
// of the node and cloud CIDR range allocations.
func (c *Controller) Start(logger klog.Logger, nodeInformer informers.NodeInformer) error {
logger.Info("Starting IPAM controller", "config", c.config)
ctx := klog.NewContext(context.TODO(), logger)
nodes, err := listNodes(ctx, c.adapter.k8s)
if err != nil {
return err
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
_, cidrRange, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
if err == nil {
c.set.Occupy(cidrRange)
logger.V(3).Info("Occupying CIDR for node", "CIDR", node.Spec.PodCIDR, "node", klog.KObj(&node))
} else {
logger.Error(err, "Node has an invalid CIDR", "node", klog.KObj(&node), "CIDR", node.Spec.PodCIDR)
}
}
func() {
c.lock.Lock()
defer c.lock.Unlock()
// XXX/bowei -- stagger the start of each sync cycle.
syncer := c.newSyncer(node.Name)
c.syncers[node.Name] = syncer
go syncer.Loop(logger, nil)
}()
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
return c.onAdd(logger, node)
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return c.onUpdate(logger, newNode)
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
return c.onDelete(logger, node)
}),
})
return nil
}
func (c *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
go c.adapter.Run(ctx)
<-ctx.Done()
}
type nodeState struct {
t Timeout
}
func (ns *nodeState) ReportResult(err error) {
ns.t.Update(err == nil)
}
func (ns *nodeState) ResyncTimeout() time.Duration {
return ns.t.Next()
}
func (c *Controller) newSyncer(name string) *nodesync.NodeSync {
ns := &nodeState{
Timeout{
Resync: c.config.Resync,
MaxBackoff: c.config.MaxBackoff,
InitialRetry: c.config.InitialRetry,
},
}
return nodesync.New(ns, c.adapter, c.adapter, c.config.Mode, name, c.set)
}
func (c *Controller) onAdd(logger klog.Logger, node *v1.Node) error {
c.lock.Lock()
defer c.lock.Unlock()
syncer, ok := c.syncers[node.Name]
if !ok {
syncer = c.newSyncer(node.Name)
c.syncers[node.Name] = syncer
go syncer.Loop(logger, nil)
} else {
logger.Info("Add for node that already exists", "node", klog.KObj(node))
}
syncer.Update(node)
return nil
}
func (c *Controller) onUpdate(logger klog.Logger, node *v1.Node) error {
c.lock.Lock()
defer c.lock.Unlock()
if sync, ok := c.syncers[node.Name]; ok {
sync.Update(node)
} else {
logger.Error(nil, "Received update for non-existent node", "node", klog.KObj(node))
return fmt.Errorf("unknown node %q", node.Name)
}
return nil
}
func (c *Controller) onDelete(logger klog.Logger, node *v1.Node) error {
c.lock.Lock()
defer c.lock.Unlock()
if syncer, ok := c.syncers[node.Name]; ok {
syncer.Delete(node)
delete(c.syncers, node.Name)
} else {
logger.Info("Node was already deleted", "node", klog.KObj(node))
}
return nil
}

View File

@@ -1,75 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeipam
import (
"context"
"fmt"
"net"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
)
func createLegacyIPAM(
ctx context.Context,
ic *Controller,
nodeInformer coreinformers.NodeInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
clusterCIDRs []*net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSizes []int,
) (*ipam.Controller, error) {
cfg := &ipam.Config{
Resync: ipamResyncInterval,
MaxBackoff: ipamMaxBackoff,
InitialRetry: ipamInitialBackoff,
}
switch ic.allocatorType {
case ipam.IPAMFromClusterAllocatorType:
cfg.Mode = nodesync.SyncFromCluster
case ipam.IPAMFromCloudAllocatorType:
cfg.Mode = nodesync.SyncFromCloud
}
// we may end up here with no cidr at all in case of FromCloud/FromCluster
var cidr *net.IPNet
if len(clusterCIDRs) > 0 {
cidr = clusterCIDRs[0]
}
logger := klog.FromContext(ctx)
if len(clusterCIDRs) > 1 {
logger.Info("Multiple cidrs were configured with FromCluster or FromCloud. cidrs except first one were discarded")
}
ipamc, err := ipam.NewController(ctx, cfg, kubeClient, cloud, cidr, serviceCIDR, nodeCIDRMaskSizes[0])
if err != nil {
return nil, fmt.Errorf("error creating ipam controller: %w", err)
}
if err := ipamc.Start(logger, nodeInformer); err != nil {
return nil, fmt.Errorf("error trying to Init(): %w", err)
}
return ipamc, nil
}

View File

@@ -19,9 +19,6 @@ package nodeipam
import (
"context"
"fmt"
"net"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
@@ -33,18 +30,7 @@ import (
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
)
const (
// ipamResyncInterval is the amount of time between when the cloud and node
// CIDR range assignments are synchronized.
ipamResyncInterval = 30 * time.Second
// ipamMaxBackoff is the maximum backoff for retrying synchronization of a
// given in the error state.
ipamMaxBackoff = 10 * time.Second
// ipamInitialRetry is the initial retry interval for retrying synchronization of a
// given in the error state.
ipamInitialBackoff = 250 * time.Millisecond
"net"
)
// ipamController is an interface abstracting an interface for

View File

@@ -1,118 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeipam
import (
"context"
"errors"
"net"
"strings"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/legacy-cloud-providers/gce"
netutils "k8s.io/utils/net"
)
func newTestNodeIpamController(ctx context.Context, clusterCIDR []*net.IPNet, serviceCIDR *net.IPNet, secondaryServiceCIDR *net.IPNet, nodeCIDRMaskSizes []int, allocatorType ipam.CIDRAllocatorType) (*Controller, error) {
clientSet := fake.NewSimpleClientset()
fakeNodeHandler := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node0"}},
},
Clientset: fake.NewSimpleClientset(),
}
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node)
}
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
return NewNodeIpamController(
ctx,
fakeNodeInformer, fakeGCE, clientSet,
clusterCIDR, serviceCIDR, secondaryServiceCIDR, nodeCIDRMaskSizes, allocatorType,
)
}
// TestNewNodeIpamControllerWithCIDRMasks tests if the controller can be
// created with combinations of network CIDRs and masks.
func TestNewNodeIpamControllerWithCIDRMasks(t *testing.T) {
emptyServiceCIDR := ""
for _, tc := range []struct {
desc string
clusterCIDR string
serviceCIDR string
secondaryServiceCIDR string
maskSize []int
allocatorType ipam.CIDRAllocatorType
expectedError error
}{
{"valid_range_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.RangeAllocatorType, nil},
{"valid_range_allocator_dualstack", "10.0.0.0/21,2000::/48", "10.1.0.0/21", emptyServiceCIDR, []int{24, 64}, ipam.RangeAllocatorType, nil},
{"valid_range_allocator_dualstack_dualstackservice", "10.0.0.0/21,2000::/48", "10.1.0.0/21", "3000::/112", []int{24, 64}, ipam.RangeAllocatorType, nil},
{"valid_cloud_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.CloudAllocatorType, nil},
{"valid_ipam_from_cluster", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromClusterAllocatorType, nil},
{"valid_ipam_from_cloud", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromCloudAllocatorType, nil},
{"valid_skip_cluster_CIDR_validation_for_cloud_allocator", "invalid", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.CloudAllocatorType, nil},
{"valid_CIDR_larger_than_mask_cloud_allocator", "10.0.0.0/16", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.CloudAllocatorType, nil},
{"invalid_cluster_CIDR", "", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromClusterAllocatorType, errors.New("Controller: Must specify --cluster-cidr if --allocate-node-cidrs is set")},
{"invalid_CIDR_smaller_than_mask_other_allocators", "10.0.0.0/26", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromCloudAllocatorType, errors.New("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than or equal to --node-cidr-mask-size configured for CIDR family")},
{"invalid_serviceCIDR_contains_clusterCIDR", "10.0.0.0/16", "10.0.0.0/8", emptyServiceCIDR, []int{24}, ipam.IPAMFromClusterAllocatorType, errors.New("error creating ipam controller: failed after occupy serviceCIDR: CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")},
{"invalid_CIDR_mask_size", "10.0.0.0/24,2000::/64", "10.1.0.0/21", emptyServiceCIDR, []int{24, 48}, ipam.IPAMFromClusterAllocatorType, errors.New("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than or equal to --node-cidr-mask-size configured for CIDR family")},
} {
test := tc
_, ctx := ktesting.NewTestContext(t)
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
clusterCidrs, err := netutils.ParseCIDRs(strings.Split(test.clusterCIDR, ","))
if err != nil {
clusterCidrs = nil
}
_, serviceCIDRIpNet, err := netutils.ParseCIDRSloppy(test.serviceCIDR)
if err != nil {
serviceCIDRIpNet = nil
}
_, secondaryServiceCIDRIpNet, err := netutils.ParseCIDRSloppy(test.secondaryServiceCIDR)
if err != nil {
secondaryServiceCIDRIpNet = nil
}
_, err = newTestNodeIpamController(ctx, clusterCidrs, serviceCIDRIpNet, secondaryServiceCIDRIpNet, test.maskSize, test.allocatorType)
if test.expectedError == nil {
if err != nil {
t.Errorf("Test %s, unexpected error: %v", test.desc, err)
}
} else {
if err.Error() != test.expectedError.Error() {
t.Errorf("Test %s, got error: %v, expected error: %v", test.desc, err, test.expectedError)
}
}
})
}
}

View File

@@ -1,6 +1,3 @@
//go:build providerless
// +build providerless
/*
Copyright 2019 The Kubernetes Authors.