diff --git a/pkg/controller/nodeipam/ipam/adapter.go b/pkg/controller/nodeipam/ipam/adapter.go index e3214be114e..82de5b24fa0 100644 --- a/pkg/controller/nodeipam/ipam/adapter.go +++ b/pkg/controller/nodeipam/ipam/adapter.go @@ -31,6 +31,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -43,26 +44,34 @@ type adapter struct { k8s clientset.Interface cloud *gce.Cloud - recorder record.EventRecorder + broadcaster record.EventBroadcaster + recorder record.EventRecorder } func newAdapter(k8s clientset.Interface, cloud *gce.Cloud) *adapter { + broadcaster := record.NewBroadcaster() + ret := &adapter{ - k8s: k8s, - cloud: cloud, + k8s: k8s, + cloud: cloud, + broadcaster: broadcaster, + recorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"}), } - broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - ret.recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"}) - klog.V(0).Infof("Sending events to api server.") - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ - Interface: k8s.CoreV1().Events(""), - }) - return ret } +func (a *adapter) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + // Start event processing pipeline. + a.broadcaster.StartStructuredLogging(0) + a.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: a.k8s.CoreV1().Events("")}) + defer a.broadcaster.Shutdown() + + <-stopCh +} + func (a *adapter) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) { if node.Spec.ProviderID == "" { return nil, fmt.Errorf("node %s doesn't have providerID", node.Name) diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index 99187afdd89..a46de486e95 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -74,6 +74,7 @@ type cloudCIDRAllocator struct { // and not blocking on long operations (which shouldn't be done from // event handlers anyway). nodeUpdateChannel chan string + broadcaster record.EventBroadcaster recorder record.EventRecorder // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation @@ -91,9 +92,6 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) - eventBroadcaster.StartStructuredLogging(0) - klog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) gceCloud, ok := cloud.(*gce.Cloud) if !ok { @@ -107,6 +105,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter nodeLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, nodeUpdateChannel: make(chan string, cidrUpdateQueueSize), + broadcaster: eventBroadcaster, recorder: recorder, nodesInProcessing: map[string]*nodeProcessingInfo{}, } @@ -136,6 +135,12 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + // Start event processing pipeline. + ca.broadcaster.StartStructuredLogging(0) + klog.V(0).Infof("Sending events to api server.") + ca.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ca.client.CoreV1().Events("")}) + defer ca.broadcaster.Shutdown() + klog.Infof("Starting cloud CIDR allocator") defer klog.Infof("Shutting down cloud CIDR allocator") diff --git a/pkg/controller/nodeipam/ipam/controller_legacyprovider.go b/pkg/controller/nodeipam/ipam/controller_legacyprovider.go index 2012f11af6d..d672245c478 100644 --- a/pkg/controller/nodeipam/ipam/controller_legacyprovider.go +++ b/pkg/controller/nodeipam/ipam/controller_legacyprovider.go @@ -29,6 +29,7 @@ import ( netutils "k8s.io/utils/net" v1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" informers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -150,6 +151,13 @@ func (c *Controller) Start(nodeInformer informers.NodeInformer) error { return nil } +func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + go c.adapter.Run(stopCh) + <-stopCh +} + // occupyServiceCIDR removes the service CIDR range from the cluster CIDR if it // intersects. func occupyServiceCIDR(set *cidrset.CidrSet, clusterCIDR, serviceCIDR *net.IPNet) error { diff --git a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go index 3f9cd9d3c4e..0efc28543a6 100644 --- a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go +++ b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go @@ -91,6 +91,7 @@ type multiCIDRRangeAllocator struct { // Channel that is used to pass updating Nodes and their reserved CIDRs to the background. // This increases a throughput of CIDR assignment by not blocking on long operations. nodeCIDRUpdateChannel chan multiCIDRNodeReservedCIDRs + broadcaster record.EventBroadcaster recorder record.EventRecorder // queue is where incoming work is placed to de-dup and to allow "easy" // rate limited requeues on errors @@ -124,13 +125,6 @@ func NewMultiCIDRRangeAllocator( Component: "multiCIDRRangeAllocator", } recorder := eventBroadcaster.NewRecorder(scheme.Scheme, eventSource) - eventBroadcaster.StartStructuredLogging(0) - klog.V(0).Infof("Started sending events to API Server. (EventSource = %v)", eventSource) - - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: client.CoreV1().Events(""), - }) ra := &multiCIDRRangeAllocator{ client: client, @@ -139,6 +133,7 @@ func NewMultiCIDRRangeAllocator( clusterCIDRLister: clusterCIDRInformer.Lister(), clusterCIDRSynced: clusterCIDRInformer.Informer().HasSynced, nodeCIDRUpdateChannel: make(chan multiCIDRNodeReservedCIDRs, cidrUpdateQueueSize), + broadcaster: eventBroadcaster, recorder: recorder, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator"), lock: &sync.Mutex{}, @@ -243,6 +238,14 @@ func NewMultiCIDRRangeAllocator( func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + // Start event processing pipeline. + r.broadcaster.StartStructuredLogging(0) + klog.V(0).Infof("Started sending events to API Server.") + r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")}) + defer r.broadcaster.Shutdown() + + defer r.queue.ShutDown() + klog.Infof("Starting Multi CIDR Range allocator") defer klog.Infof("Shutting down Multi CIDR Range allocator") diff --git a/pkg/controller/nodeipam/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go index 54a0db9f51f..1b433549043 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -54,6 +54,7 @@ type rangeAllocator struct { // Channel that is used to pass updating Nodes and their reserved CIDRs to the background // This increases a throughput of CIDR assignment by not blocking on long operations. nodeCIDRUpdateChannel chan nodeReservedCIDRs + broadcaster record.EventBroadcaster recorder record.EventRecorder // Keep a set of nodes that are currently being processed to avoid races in CIDR allocation lock sync.Mutex @@ -72,9 +73,6 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) - eventBroadcaster.StartStructuredLogging(0) - klog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) // create a cidrSet for each cidr we operate on // cidrSet are mapped to clusterCIDR by index @@ -94,6 +92,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No nodeLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize), + broadcaster: eventBroadcaster, recorder: recorder, nodesInProcessing: sets.NewString(), } @@ -163,6 +162,12 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No func (r *rangeAllocator) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + // Start event processing pipeline. + r.broadcaster.StartStructuredLogging(0) + klog.V(0).Infof("Sending events to api server.") + r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")}) + defer r.broadcaster.Shutdown() + klog.Infof("Starting range CIDR allocator") defer klog.Infof("Shutting down range CIDR allocator") diff --git a/pkg/controller/nodeipam/legacyprovider.go b/pkg/controller/nodeipam/legacyprovider.go index accd4ae27f0..eb0dd67e458 100644 --- a/pkg/controller/nodeipam/legacyprovider.go +++ b/pkg/controller/nodeipam/legacyprovider.go @@ -31,7 +31,7 @@ import ( nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync" ) -func startLegacyIPAM( +func createLegacyIPAM( ic *Controller, nodeInformer coreinformers.NodeInformer, cloud cloudprovider.Interface, @@ -39,7 +39,7 @@ func startLegacyIPAM( clusterCIDRs []*net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSizes []int, -) { +) *ipam.Controller { cfg := &ipam.Config{ Resync: ipamResyncInterval, MaxBackoff: ipamMaxBackoff, @@ -67,4 +67,5 @@ func startLegacyIPAM( if err := ipamc.Start(nodeInformer); err != nil { klog.Fatalf("Error trying to Init(): %v", err) } + return ipamc } diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go index cb0853ffeab..813aeab1a7f 100644 --- a/pkg/controller/nodeipam/node_ipam_controller.go +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -46,6 +46,13 @@ const ( ipamInitialBackoff = 250 * time.Millisecond ) +// ipamController is an interface abstracting an interface for +// legacy mode. It is needed to ensure correct building for +// both provider-specific and providerless environments. +type ipamController interface { + Run(<-chan struct{}) +} + // Controller is the controller that manages node ipam state. type Controller struct { allocatorType ipam.CIDRAllocatorType @@ -62,6 +69,7 @@ type Controller struct { nodeLister corelisters.NodeLister nodeInformerSynced cache.InformerSynced + legacyIPAM ipamController cidrAllocator ipam.CIDRAllocator } @@ -112,7 +120,7 @@ func NewNodeIpamController( // TODO: Abstract this check into a generic controller manager should run method. if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType { - startLegacyIPAM(ic, nodeInformer, cloud, kubeClient, clusterCIDRs, serviceCIDR, nodeCIDRMaskSizes) + ic.legacyIPAM = createLegacyIPAM(ic, nodeInformer, cloud, kubeClient, clusterCIDRs, serviceCIDR, nodeCIDRMaskSizes) } else { var err error @@ -151,7 +159,9 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { return } - if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType { + if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType { + go nc.legacyIPAM.Run(stopCh) + } else { go nc.cidrAllocator.Run(stopCh) } diff --git a/pkg/controller/nodeipam/nolegacyprovider.go b/pkg/controller/nodeipam/nolegacyprovider.go index 8496ccd1325..8f15f4d1826 100644 --- a/pkg/controller/nodeipam/nolegacyprovider.go +++ b/pkg/controller/nodeipam/nolegacyprovider.go @@ -28,7 +28,14 @@ import ( "k8s.io/klog/v2" ) -func startLegacyIPAM( +type fakeController struct { +} + +func (f *fakeController) Run(stopCh <-chan struct{}) { + <-stopCh +} + +func createLegacyIPAM( ic *Controller, nodeInformer coreinformers.NodeInformer, cloud cloudprovider.Interface, @@ -36,6 +43,7 @@ func startLegacyIPAM( clusterCIDRs []*net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSizes []int, -) { +) *fakeController { klog.Fatal("Error trying to Init(): legacy cloud provider support disabled at build time") + return &fakeController{} } diff --git a/test/integration/framework/etcd.go b/test/integration/framework/etcd.go index 3a9844b843f..805fbeb76a4 100644 --- a/test/integration/framework/etcd.go +++ b/test/integration/framework/etcd.go @@ -196,9 +196,9 @@ func EtcdMain(tests func() int) { checkNumberOfGoroutines := func() (bool, error) { // Leave some room for goroutines we can not get rid of // like k8s.io/klog/v2.(*loggingT).flushDaemon() - // TODO(#108483): Reduce this number once we address the - // couple remaining issues. - if dg := runtime.NumGoroutine() - before; dg <= 9 { + // TODO(#108483): Figure out if we can reduce this + // further (ideally down to zero). + if dg := runtime.NumGoroutine() - before; dg <= 4 { return true, nil } // Allow goroutines to schedule and die off.