Merge pull request #47347 from deads2k/agg-34-check

Automatic merge from submit-queue (batch tested with PRs 53579, 47347). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

actually check for a live discovery endpoint before aggregating

Aggregation doesn't work without being able to hit a discovery endpoint.  This adds a simple status check for whether the discovery endpoint is reachable.  The actual status code doesn't matter, we just need to be able to make the connection.
This commit is contained in:
Kubernetes Submit Queue 2017-10-24 06:02:20 -07:00 committed by GitHub
commit de75c09db4
2 changed files with 66 additions and 1 deletions

View File

@ -207,6 +207,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
c.GenericConfig.SharedInformerFactory.Core().V1().Services(), c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(), c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.Apiregistration(), apiregistrationClient.Apiregistration(),
c.ExtraConfig.ProxyTransport,
s.serviceResolver,
) )
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {

View File

@ -17,7 +17,10 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"crypto/tls"
"fmt" "fmt"
"net/http"
"net/url"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -42,6 +45,10 @@ import (
"k8s.io/kube-aggregator/pkg/controllers" "k8s.io/kube-aggregator/pkg/controllers"
) )
type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error)
}
type AvailableConditionController struct { type AvailableConditionController struct {
apiServiceClient apiregistrationclient.APIServicesGetter apiServiceClient apiregistrationclient.APIServicesGetter
@ -55,6 +62,9 @@ type AvailableConditionController struct {
endpointsLister v1listers.EndpointsLister endpointsLister v1listers.EndpointsLister
endpointsSynced cache.InformerSynced endpointsSynced cache.InformerSynced
proxyTransport *http.Transport
serviceResolver ServiceResolver
// To allow injection for testing. // To allow injection for testing.
syncFn func(key string) error syncFn func(key string) error
@ -66,6 +76,8 @@ func NewAvailableConditionController(
serviceInformer v1informers.ServiceInformer, serviceInformer v1informers.ServiceInformer,
endpointsInformer v1informers.EndpointsInformer, endpointsInformer v1informers.EndpointsInformer,
apiServiceClient apiregistrationclient.APIServicesGetter, apiServiceClient apiregistrationclient.APIServicesGetter,
proxyTransport *http.Transport,
serviceResolver ServiceResolver,
) *AvailableConditionController { ) *AvailableConditionController {
c := &AvailableConditionController{ c := &AvailableConditionController{
apiServiceClient: apiServiceClient, apiServiceClient: apiServiceClient,
@ -75,6 +87,8 @@ func NewAvailableConditionController(
servicesSynced: serviceInformer.Informer().HasSynced, servicesSynced: serviceInformer.Informer().HasSynced,
endpointsLister: endpointsInformer.Lister(), endpointsLister: endpointsInformer.Lister(),
endpointsSynced: endpointsInformer.Informer().HasSynced, endpointsSynced: endpointsInformer.Informer().HasSynced,
proxyTransport: proxyTransport,
serviceResolver: serviceResolver,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"),
} }
@ -175,8 +189,57 @@ func (c *AvailableConditionController) sync(key string) error {
return err return err
} }
} }
// actually try to hit the discovery endpoint when it isn't local and when we're routing as a service.
if apiService.Spec.Service != nil && c.serviceResolver != nil {
discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name)
if err != nil {
return err
}
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout.
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
// the request should happen quickly.
Timeout: 5 * time.Second,
}
if c.proxyTransport != nil {
httpClient.Transport = c.proxyTransport
}
// TODO actually try to hit the discovery endpoint errCh := make(chan error)
go func() {
resp, err := httpClient.Get(discoveryURL.String())
if resp != nil {
resp.Body.Close()
}
errCh <- err
}()
select {
case err = <-errCh:
// we had trouble with slow dial and DNS responses causing us to wait too long.
// we added this as insurance
case <-time.After(6 * time.Second):
err = fmt.Errorf("timed out waiting for %v", discoveryURL)
}
if err != nil {
availableCondition.Status = apiregistration.ConditionFalse
availableCondition.Reason = "FailedDiscoveryCheck"
availableCondition.Message = fmt.Sprintf("no response from %v: %v", discoveryURL, err)
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
_, updateErr := c.apiServiceClient.APIServices().UpdateStatus(apiService)
if updateErr != nil {
return updateErr
}
// force a requeue to make it very obvious that this will be retried at some point in the future
// along with other requeues done via service change, endpoint change, and resync
return err
}
}
availableCondition.Reason = "Passed" availableCondition.Reason = "Passed"
availableCondition.Message = "all checks passed" availableCondition.Message = "all checks passed"