Merge pull request #55165 from deads2k/agg-01-resync

Automatic merge from submit-queue (batch tested with PRs 55403, 54660, 55165). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

resync status on apiservices for aggregator

Adds a fairly tight (30 second) resync on the apiservices to force redetection of status.  The checks aren't very expensive and there are relatively few apiservices.  Taking a little resync pain here is cheaper than the fallout for all clients.
This commit is contained in:
Kubernetes Submit Queue 2017-11-09 10:15:17 -08:00 committed by GitHub
commit e873b36a44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 25 deletions

View File

@ -221,7 +221,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
go availableController.Run(context.StopCh)
// if we end up blocking for long periods of time, we may need to increase threadiness.
go availableController.Run(5, context.StopCh)
return nil
})

View File

@ -62,7 +62,7 @@ type AvailableConditionController struct {
endpointsLister v1listers.EndpointsLister
endpointsSynced cache.InformerSynced
proxyTransport *http.Transport
discoveryClient *http.Client
serviceResolver ServiceResolver
// To allow injection for testing.
@ -87,16 +87,35 @@ func NewAvailableConditionController(
servicesSynced: serviceInformer.Informer().HasSynced,
endpointsLister: endpointsInformer.Lister(),
endpointsSynced: endpointsInformer.Informer().HasSynced,
proxyTransport: proxyTransport,
serviceResolver: serviceResolver,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"),
}
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService,
})
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout.
discoveryClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
// the request should happen quickly.
Timeout: 5 * time.Second,
}
if proxyTransport != nil {
discoveryClient.Transport = proxyTransport
}
c.discoveryClient = discoveryClient
// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService,
},
30*time.Second)
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addService,
@ -195,22 +214,10 @@ func (c *AvailableConditionController) sync(key string) error {
if err != nil {
return err
}
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout.
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
// the request should happen quickly.
Timeout: 5 * time.Second,
}
if c.proxyTransport != nil {
httpClient.Transport = c.proxyTransport
}
errCh := make(chan error)
go func() {
resp, err := httpClient.Get(discoveryURL.String())
resp, err := c.discoveryClient.Get(discoveryURL.String())
if resp != nil {
resp.Body.Close()
}
@ -248,7 +255,7 @@ func (c *AvailableConditionController) sync(key string) error {
return err
}
func (c *AvailableConditionController) Run(stopCh <-chan struct{}) {
func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -259,9 +266,9 @@ func (c *AvailableConditionController) Run(stopCh <-chan struct{}) {
return
}
// only start one worker thread since its a slow moving API and the aggregation server adding bits
// aren't threadsafe
go wait.Until(c.runWorker, time.Second, stopCh)
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}