mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #113559 from wojtek-t/clean_shutdown_5
Clean shutdown of few more integration tests
This commit is contained in:
commit
d802bd56ac
@ -222,6 +222,15 @@ func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
|
||||
c.ComponentConfig = *cfg
|
||||
}
|
||||
|
||||
// Build kubeconfig first to so that if it fails, it doesn't cause leaking
|
||||
// goroutines (started from initializing secure serving - which underneath
|
||||
// creates a queue which in its constructor starts a goroutine).
|
||||
kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.KubeConfig = kubeConfig
|
||||
|
||||
if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -271,14 +280,8 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Prepare kube config.
|
||||
kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Prepare kube clients.
|
||||
client, eventClient, err := createClients(kubeConfig)
|
||||
client, eventClient, err := createClients(c.KubeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -294,16 +297,15 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
|
||||
schedulerName = c.ComponentConfig.Profiles[0].SchedulerName
|
||||
}
|
||||
coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName)
|
||||
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, kubeConfig, coreRecorder)
|
||||
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, c.KubeConfig, coreRecorder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.Client = client
|
||||
c.KubeConfig = kubeConfig
|
||||
c.InformerFactory = scheduler.NewInformerFactory(client, 0)
|
||||
dynClient := dynamic.NewForConfigOrDie(kubeConfig)
|
||||
dynClient := dynamic.NewForConfigOrDie(c.KubeConfig)
|
||||
c.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, corev1.NamespaceAll, nil)
|
||||
c.LeaderElection = leaderElectionConfig
|
||||
|
||||
|
@ -62,8 +62,11 @@ var (
|
||||
// ControllerV2 is a controller for CronJobs.
|
||||
// Refactored Cronjob controller that uses DelayingQueue and informers
|
||||
type ControllerV2 struct {
|
||||
queue workqueue.RateLimitingInterface
|
||||
recorder record.EventRecorder
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
kubeClient clientset.Interface
|
||||
recorder record.EventRecorder
|
||||
broadcaster record.EventBroadcaster
|
||||
|
||||
jobControl jobControlInterface
|
||||
cronJobControl cjControlInterface
|
||||
@ -81,12 +84,12 @@ type ControllerV2 struct {
|
||||
// NewControllerV2 creates and initializes a new Controller.
|
||||
func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartStructuredLogging(0)
|
||||
eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
||||
|
||||
jm := &ControllerV2{
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
|
||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
|
||||
kubeClient: kubeClient,
|
||||
broadcaster: eventBroadcaster,
|
||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
|
||||
|
||||
jobControl: realJobControl{KubeClient: kubeClient},
|
||||
cronJobControl: &realCJControl{KubeClient: kubeClient},
|
||||
@ -123,6 +126,12 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer
|
||||
// Run starts the main goroutine responsible for watching and syncing jobs.
|
||||
func (jm *ControllerV2) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
// Start event processing pipeline.
|
||||
jm.broadcaster.StartStructuredLogging(0)
|
||||
jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
|
||||
defer jm.broadcaster.Shutdown()
|
||||
|
||||
defer jm.queue.ShutDown()
|
||||
|
||||
klog.InfoS("Starting cronjob controller v2")
|
||||
|
@ -55,6 +55,7 @@ type Controller struct {
|
||||
serviceCIDR *net.IPNet
|
||||
secondaryServiceCIDR *net.IPNet
|
||||
kubeClient clientset.Interface
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
// Method for easy mocking in unittest.
|
||||
lookupIP func(host string) ([]net.IP, error)
|
||||
|
||||
@ -84,15 +85,6 @@ func NewNodeIpamController(
|
||||
klog.Fatalf("kubeClient is nil when starting Controller")
|
||||
}
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartStructuredLogging(0)
|
||||
|
||||
klog.Infof("Sending events to api server.")
|
||||
eventBroadcaster.StartRecordingToSink(
|
||||
&v1core.EventSinkImpl{
|
||||
Interface: kubeClient.CoreV1().Events(""),
|
||||
})
|
||||
|
||||
// Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
|
||||
if allocatorType != ipam.CloudAllocatorType {
|
||||
if len(clusterCIDRs) == 0 {
|
||||
@ -110,6 +102,7 @@ func NewNodeIpamController(
|
||||
ic := &Controller{
|
||||
cloud: cloud,
|
||||
kubeClient: kubeClient,
|
||||
eventBroadcaster: record.NewBroadcaster(),
|
||||
lookupIP: net.LookupIP,
|
||||
clusterCIDRs: clusterCIDRs,
|
||||
serviceCIDR: serviceCIDR,
|
||||
@ -146,6 +139,11 @@ func NewNodeIpamController(
|
||||
func (nc *Controller) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
// Start event processing pipeline.
|
||||
nc.eventBroadcaster.StartStructuredLogging(0)
|
||||
nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
|
||||
defer nc.eventBroadcaster.Shutdown()
|
||||
|
||||
klog.Infof("Starting ipam controller")
|
||||
defer klog.Infof("Shutting down ipam controller")
|
||||
|
||||
|
@ -138,6 +138,20 @@ func (o *CloudControllerManagerOptions) Flags(allControllers, disabledByDefaultC
|
||||
// ApplyTo fills up cloud controller manager config with options.
|
||||
func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent string) error {
|
||||
var err error
|
||||
|
||||
// Build kubeconfig first to so that if it fails, it doesn't cause leaking
|
||||
// goroutines (started from initializing secure serving - which underneath
|
||||
// creates a queue which in its constructor starts a goroutine).
|
||||
c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Kubeconfig.DisableCompression = true
|
||||
c.Kubeconfig.ContentConfig.AcceptContentTypes = o.Generic.ClientConnection.AcceptContentTypes
|
||||
c.Kubeconfig.ContentConfig.ContentType = o.Generic.ClientConnection.ContentType
|
||||
c.Kubeconfig.QPS = o.Generic.ClientConnection.QPS
|
||||
c.Kubeconfig.Burst = int(o.Generic.ClientConnection.Burst)
|
||||
|
||||
if err = o.Generic.ApplyTo(&c.ComponentConfig.Generic); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -159,16 +173,6 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent stri
|
||||
}
|
||||
}
|
||||
|
||||
c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Kubeconfig.DisableCompression = true
|
||||
c.Kubeconfig.ContentConfig.AcceptContentTypes = o.Generic.ClientConnection.AcceptContentTypes
|
||||
c.Kubeconfig.ContentConfig.ContentType = o.Generic.ClientConnection.ContentType
|
||||
c.Kubeconfig.QPS = o.Generic.ClientConnection.QPS
|
||||
c.Kubeconfig.Burst = int(o.Generic.ClientConnection.Burst)
|
||||
|
||||
c.Client, err = clientset.NewForConfig(restclient.AddUserAgent(c.Kubeconfig, userAgent))
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user