mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
resync status on apiservices for aggregator
This commit is contained in:
parent
33f873dbbe
commit
b7bc9b11dd
@ -221,7 +221,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
return nil
|
||||
})
|
||||
s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||
go availableController.Run(context.StopCh)
|
||||
// if we end up blocking for long periods of time, we may need to increase threadiness.
|
||||
go availableController.Run(5, context.StopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -62,7 +62,7 @@ type AvailableConditionController struct {
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
endpointsSynced cache.InformerSynced
|
||||
|
||||
proxyTransport *http.Transport
|
||||
discoveryClient *http.Client
|
||||
serviceResolver ServiceResolver
|
||||
|
||||
// To allow injection for testing.
|
||||
@ -87,16 +87,35 @@ func NewAvailableConditionController(
|
||||
servicesSynced: serviceInformer.Informer().HasSynced,
|
||||
endpointsLister: endpointsInformer.Lister(),
|
||||
endpointsSynced: endpointsInformer.Informer().HasSynced,
|
||||
proxyTransport: proxyTransport,
|
||||
serviceResolver: serviceResolver,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"),
|
||||
}
|
||||
|
||||
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addAPIService,
|
||||
UpdateFunc: c.updateAPIService,
|
||||
DeleteFunc: c.deleteAPIService,
|
||||
})
|
||||
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
|
||||
// that's not so bad) and sets a very short timeout.
|
||||
discoveryClient := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
// the request should happen quickly.
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
if proxyTransport != nil {
|
||||
discoveryClient.Transport = proxyTransport
|
||||
}
|
||||
c.discoveryClient = discoveryClient
|
||||
|
||||
// resync on this one because it is low cardinality and rechecking the actual discovery
|
||||
// allows us to detect health in a more timely fashion when network connectivity to
|
||||
// nodes is snipped, but the network still attempts to route there. See
|
||||
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
|
||||
apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addAPIService,
|
||||
UpdateFunc: c.updateAPIService,
|
||||
DeleteFunc: c.deleteAPIService,
|
||||
},
|
||||
30*time.Second)
|
||||
|
||||
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addService,
|
||||
@ -195,22 +214,10 @@ func (c *AvailableConditionController) sync(key string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
|
||||
// that's not so bad) and sets a very short timeout.
|
||||
httpClient := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
// the request should happen quickly.
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
if c.proxyTransport != nil {
|
||||
httpClient.Transport = c.proxyTransport
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
resp, err := httpClient.Get(discoveryURL.String())
|
||||
resp, err := c.discoveryClient.Get(discoveryURL.String())
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
@ -248,7 +255,7 @@ func (c *AvailableConditionController) sync(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) Run(stopCh <-chan struct{}) {
|
||||
func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
@ -259,9 +266,9 @@ func (c *AvailableConditionController) Run(stopCh <-chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
||||
// aren't threadsafe
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
for i := 0; i < threadiness; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user