Merge pull request #113620 from wojtek-t/clean_shutdown_6

Clean shutdown of nodeipam controller
This commit is contained in:
Kubernetes Prow Robot 2022-11-04 05:14:25 -07:00 committed by GitHub
commit 892a29b93b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 82 additions and 33 deletions

View File

@ -31,6 +31,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
@ -43,26 +44,34 @@ type adapter struct {
k8s clientset.Interface
cloud *gce.Cloud
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
}
func newAdapter(k8s clientset.Interface, cloud *gce.Cloud) *adapter {
broadcaster := record.NewBroadcaster()
ret := &adapter{
k8s: k8s,
cloud: cloud,
k8s: k8s,
cloud: cloud,
broadcaster: broadcaster,
recorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"}),
}
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
ret.recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"})
klog.V(0).Infof("Sending events to api server.")
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: k8s.CoreV1().Events(""),
})
return ret
}
func (a *adapter) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
a.broadcaster.StartStructuredLogging(0)
a.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: a.k8s.CoreV1().Events("")})
defer a.broadcaster.Shutdown()
<-stopCh
}
func (a *adapter) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) {
if node.Spec.ProviderID == "" {
return nil, fmt.Errorf("node %s doesn't have providerID", node.Name)

View File

@ -74,6 +74,7 @@ type cloudCIDRAllocator struct {
// and not blocking on long operations (which shouldn't be done from
// event handlers anyway).
nodeUpdateChannel chan string
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
@ -91,9 +92,6 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartStructuredLogging(0)
klog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
gceCloud, ok := cloud.(*gce.Cloud)
if !ok {
@ -107,6 +105,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
broadcaster: eventBroadcaster,
recorder: recorder,
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
@ -136,6 +135,12 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
ca.broadcaster.StartStructuredLogging(0)
klog.V(0).Infof("Sending events to api server.")
ca.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ca.client.CoreV1().Events("")})
defer ca.broadcaster.Shutdown()
klog.Infof("Starting cloud CIDR allocator")
defer klog.Infof("Shutting down cloud CIDR allocator")

View File

@ -29,6 +29,7 @@ import (
netutils "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
@ -150,6 +151,13 @@ func (c *Controller) Start(nodeInformer informers.NodeInformer) error {
return nil
}
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go c.adapter.Run(stopCh)
<-stopCh
}
// occupyServiceCIDR removes the service CIDR range from the cluster CIDR if it
// intersects.
func occupyServiceCIDR(set *cidrset.CidrSet, clusterCIDR, serviceCIDR *net.IPNet) error {

View File

@ -91,6 +91,7 @@ type multiCIDRRangeAllocator struct {
// Channel that is used to pass updating Nodes and their reserved CIDRs to the background.
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan multiCIDRNodeReservedCIDRs
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// queue is where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
@ -124,13 +125,6 @@ func NewMultiCIDRRangeAllocator(
Component: "multiCIDRRangeAllocator",
}
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, eventSource)
eventBroadcaster.StartStructuredLogging(0)
klog.V(0).Infof("Started sending events to API Server. (EventSource = %v)", eventSource)
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: client.CoreV1().Events(""),
})
ra := &multiCIDRRangeAllocator{
client: client,
@ -139,6 +133,7 @@ func NewMultiCIDRRangeAllocator(
clusterCIDRLister: clusterCIDRInformer.Lister(),
clusterCIDRSynced: clusterCIDRInformer.Informer().HasSynced,
nodeCIDRUpdateChannel: make(chan multiCIDRNodeReservedCIDRs, cidrUpdateQueueSize),
broadcaster: eventBroadcaster,
recorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator"),
lock: &sync.Mutex{},
@ -243,6 +238,14 @@ func NewMultiCIDRRangeAllocator(
func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
r.broadcaster.StartStructuredLogging(0)
klog.V(0).Infof("Started sending events to API Server.")
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
defer r.broadcaster.Shutdown()
defer r.queue.ShutDown()
klog.Infof("Starting Multi CIDR Range allocator")
defer klog.Infof("Shutting down Multi CIDR Range allocator")

View File

@ -54,6 +54,7 @@ type rangeAllocator struct {
// Channel that is used to pass updating Nodes and their reserved CIDRs to the background
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeReservedCIDRs
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// Keep a set of nodes that are currently being processed to avoid races in CIDR allocation
lock sync.Mutex
@ -72,9 +73,6 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartStructuredLogging(0)
klog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
// create a cidrSet for each cidr we operate on
// cidrSet are mapped to clusterCIDR by index
@ -94,6 +92,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),
broadcaster: eventBroadcaster,
recorder: recorder,
nodesInProcessing: sets.NewString(),
}
@ -163,6 +162,12 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
r.broadcaster.StartStructuredLogging(0)
klog.V(0).Infof("Sending events to api server.")
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
defer r.broadcaster.Shutdown()
klog.Infof("Starting range CIDR allocator")
defer klog.Infof("Shutting down range CIDR allocator")

View File

@ -31,7 +31,7 @@ import (
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
)
func startLegacyIPAM(
func createLegacyIPAM(
ic *Controller,
nodeInformer coreinformers.NodeInformer,
cloud cloudprovider.Interface,
@ -39,7 +39,7 @@ func startLegacyIPAM(
clusterCIDRs []*net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSizes []int,
) {
) *ipam.Controller {
cfg := &ipam.Config{
Resync: ipamResyncInterval,
MaxBackoff: ipamMaxBackoff,
@ -67,4 +67,5 @@ func startLegacyIPAM(
if err := ipamc.Start(nodeInformer); err != nil {
klog.Fatalf("Error trying to Init(): %v", err)
}
return ipamc
}

View File

@ -46,6 +46,13 @@ const (
ipamInitialBackoff = 250 * time.Millisecond
)
// ipamController is an interface abstracting an interface for
// legacy mode. It is needed to ensure correct building for
// both provider-specific and providerless environments.
type ipamController interface {
Run(<-chan struct{})
}
// Controller is the controller that manages node ipam state.
type Controller struct {
allocatorType ipam.CIDRAllocatorType
@ -62,6 +69,7 @@ type Controller struct {
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
legacyIPAM ipamController
cidrAllocator ipam.CIDRAllocator
}
@ -112,7 +120,7 @@ func NewNodeIpamController(
// TODO: Abstract this check into a generic controller manager should run method.
if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType {
startLegacyIPAM(ic, nodeInformer, cloud, kubeClient, clusterCIDRs, serviceCIDR, nodeCIDRMaskSizes)
ic.legacyIPAM = createLegacyIPAM(ic, nodeInformer, cloud, kubeClient, clusterCIDRs, serviceCIDR, nodeCIDRMaskSizes)
} else {
var err error
@ -151,7 +159,9 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
return
}
if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType {
if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
go nc.legacyIPAM.Run(stopCh)
} else {
go nc.cidrAllocator.Run(stopCh)
}

View File

@ -28,7 +28,14 @@ import (
"k8s.io/klog/v2"
)
func startLegacyIPAM(
type fakeController struct {
}
func (f *fakeController) Run(stopCh <-chan struct{}) {
<-stopCh
}
func createLegacyIPAM(
ic *Controller,
nodeInformer coreinformers.NodeInformer,
cloud cloudprovider.Interface,
@ -36,6 +43,7 @@ func startLegacyIPAM(
clusterCIDRs []*net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSizes []int,
) {
) *fakeController {
klog.Fatal("Error trying to Init(): legacy cloud provider support disabled at build time")
return &fakeController{}
}

View File

@ -196,9 +196,9 @@ func EtcdMain(tests func() int) {
checkNumberOfGoroutines := func() (bool, error) {
// Leave some room for goroutines we can not get rid of
// like k8s.io/klog/v2.(*loggingT).flushDaemon()
// TODO(#108483): Reduce this number once we address the
// couple remaining issues.
if dg := runtime.NumGoroutine() - before; dg <= 9 {
// TODO(#108483): Figure out if we can reduce this
// further (ideally down to zero).
if dg := runtime.NumGoroutine() - before; dg <= 4 {
return true, nil
}
// Allow goroutines to schedule and die off.