From b7bc9b11dd192afd9f9e155757a7e6f4b895408a Mon Sep 17 00:00:00 2001 From: David Eads Date: Mon, 6 Nov 2017 08:46:36 -0500 Subject: [PATCH] resync status on apiservices for aggregator --- .../pkg/apiserver/apiserver.go | 3 +- .../status/available_controller.go | 55 +++++++++++-------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 5ed0e037b82..f9188609ea8 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -221,7 +221,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error { - go availableController.Run(context.StopCh) + // if we end up blocking for long periods of time, we may need to increase threadiness. + go availableController.Run(5, context.StopCh) return nil }) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 76c566f6830..a9bbe08a1fd 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -62,7 +62,7 @@ type AvailableConditionController struct { endpointsLister v1listers.EndpointsLister endpointsSynced cache.InformerSynced - proxyTransport *http.Transport + discoveryClient *http.Client serviceResolver ServiceResolver // To allow injection for testing. @@ -87,16 +87,35 @@ func NewAvailableConditionController( servicesSynced: serviceInformer.Informer().HasSynced, endpointsLister: endpointsInformer.Lister(), endpointsSynced: endpointsInformer.Informer().HasSynced, - proxyTransport: proxyTransport, serviceResolver: serviceResolver, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"), } - apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.addAPIService, - UpdateFunc: c.updateAPIService, - DeleteFunc: c.deleteAPIService, - }) + // construct an http client that will ignore TLS verification (if someone owns the network and messes with your status + // that's not so bad) and sets a very short timeout. + discoveryClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + // the request should happen quickly. + Timeout: 5 * time.Second, + } + if proxyTransport != nil { + discoveryClient.Transport = proxyTransport + } + c.discoveryClient = discoveryClient + + // resync on this one because it is low cardinality and rechecking the actual discovery + // allows us to detect health in a more timely fashion when network connectivity to + // nodes is snipped, but the network still attempts to route there. See + // https://github.com/openshift/origin/issues/17159#issuecomment-341798063 + apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addAPIService, + UpdateFunc: c.updateAPIService, + DeleteFunc: c.deleteAPIService, + }, + 30*time.Second) serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addService, @@ -195,22 +214,10 @@ func (c *AvailableConditionController) sync(key string) error { if err != nil { return err } - // construct an http client that will ignore TLS verification (if someone owns the network and messes with your status - // that's not so bad) and sets a very short timeout. - httpClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, - // the request should happen quickly. - Timeout: 5 * time.Second, - } - if c.proxyTransport != nil { - httpClient.Transport = c.proxyTransport - } errCh := make(chan error) go func() { - resp, err := httpClient.Get(discoveryURL.String()) + resp, err := c.discoveryClient.Get(discoveryURL.String()) if resp != nil { resp.Body.Close() } @@ -248,7 +255,7 @@ func (c *AvailableConditionController) sync(key string) error { return err } -func (c *AvailableConditionController) Run(stopCh <-chan struct{}) { +func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -259,9 +266,9 @@ func (c *AvailableConditionController) Run(stopCh <-chan struct{}) { return } - // only start one worker thread since its a slow moving API and the aggregation server adding bits - // aren't threadsafe - go wait.Until(c.runWorker, time.Second, stopCh) + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } <-stopCh }