diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index a135cd93ecb..d3dc5cf65ba 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -182,14 +182,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler) apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s) - availableController := statuscontrollers.NewAvailableConditionController( + availableController, err := statuscontrollers.NewAvailableConditionController( informerFactory.Apiregistration().InternalVersion().APIServices(), c.GenericConfig.SharedInformerFactory.Core().V1().Services(), c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(), apiregistrationClient.Apiregistration(), c.ExtraConfig.ProxyTransport, + c.ExtraConfig.ProxyClientCert, + c.ExtraConfig.ProxyClientKey, s.serviceResolver, ) + if err != nil { + return nil, err + } s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { informerFactory.Start(context.StopCh) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD index 1026e9bd86a..cfa96422e16 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD @@ -26,7 +26,9 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/transport:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 75f1e47c5ba..4c3995e6b93 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -17,15 +17,12 @@ limitations under the License. package apiserver import ( - "crypto/tls" "fmt" "net/http" "net/url" "time" - "k8s.io/klog" - - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -36,9 +33,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" v1informers "k8s.io/client-go/informers/core/v1" v1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" - + "k8s.io/klog" "k8s.io/kube-aggregator/pkg/apis/apiregistration" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion" informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion" @@ -81,8 +80,10 @@ func NewAvailableConditionController( endpointsInformer v1informers.EndpointsInformer, apiServiceClient apiregistrationclient.APIServicesGetter, proxyTransport *http.Transport, + proxyClientCert []byte, + proxyClientKey []byte, serviceResolver ServiceResolver, -) *AvailableConditionController { +) (*AvailableConditionController, error) { c := &AvailableConditionController{ apiServiceClient: apiServiceClient, apiServiceLister: apiServiceInformer.Lister(), @@ -100,19 +101,28 @@ func NewAvailableConditionController( "AvailableConditionController"), } + // if a particular transport was specified, use that otherwise build one // construct an http client that will ignore TLS verification (if someone owns the network and messes with your status - // that's not so bad) and sets a very short timeout. - discoveryClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + // that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information + restConfig := &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + Insecure: true, + CertData: proxyClientCert, + KeyData: proxyClientKey, }, + } + if proxyTransport != nil && proxyTransport.DialContext != nil { + restConfig.Dial = proxyTransport.DialContext + } + transport, err := rest.TransportFor(restConfig) + if err != nil { + return nil, err + } + c.discoveryClient = &http.Client{ + Transport: transport, // the request should happen quickly. Timeout: 5 * time.Second, } - if proxyTransport != nil { - discoveryClient.Transport = proxyTransport - } - c.discoveryClient = discoveryClient // resync on this one because it is low cardinality and rechecking the actual discovery // allows us to detect health in a more timely fashion when network connectivity to @@ -140,7 +150,7 @@ func NewAvailableConditionController( c.syncFn = c.sync - return c + return c, nil } func (c *AvailableConditionController) sync(key string) error { @@ -254,17 +264,31 @@ func (c *AvailableConditionController) sync(key string) error { errCh := make(chan error) go func() { - resp, err := c.discoveryClient.Get(discoveryURL.String()) + newReq, err := http.NewRequest("GET", discoveryURL.String(), nil) + if err != nil { + errCh <- err + return + } + + // setting the system-masters identity ensures that we will always have access rights + transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", []string{"system:masters"}, nil) + resp, err := c.discoveryClient.Do(newReq) if resp != nil { resp.Body.Close() + // we should always been in the 200s or 300s + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode) + return + } } + errCh <- err }() select { case err = <-errCh: if err != nil { - results <- fmt.Errorf("no response from %v: %v", discoveryURL, err) + results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err) return } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index ee5f36928df..e919d317816 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -18,11 +18,15 @@ package apiserver import ( "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" "testing" "github.com/davecgh/go-spew/spew" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1listers "k8s.io/client-go/listers/core/v1" clienttesting "k8s.io/client-go/testing" @@ -99,10 +103,12 @@ func TestSync(t *testing.T) { tests := []struct { name string - apiServiceName string - apiServices []*apiregistration.APIService - services []*v1.Service - endpoints []*v1.Endpoints + apiServiceName string + apiServices []*apiregistration.APIService + services []*v1.Service + endpoints []*v1.Endpoints + forceDiscoveryFail bool + expectedAvailability apiregistration.APIServiceCondition }{ { @@ -200,66 +206,97 @@ func TestSync(t *testing.T) { Message: `all checks passed`, }, }, + { + name: "remote-bad-return", + apiServiceName: "remote.group", + apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, + endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)}, + forceDiscoveryFail: true, + expectedAvailability: apiregistration.APIServiceCondition{ + Type: apiregistration.Available, + Status: apiregistration.ConditionFalse, + Reason: "FailedDiscoveryCheck", + Message: `failing or missing response from`, + }, + }, } for _, tc := range tests { - fakeClient := fake.NewSimpleClientset() - apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - for _, obj := range tc.apiServices { - apiServiceIndexer.Add(obj) - } - for _, obj := range tc.services { - serviceIndexer.Add(obj) - } - for _, obj := range tc.endpoints { - endpointsIndexer.Add(obj) - } + t.Run(tc.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, obj := range tc.apiServices { + apiServiceIndexer.Add(obj) + } + for _, obj := range tc.services { + serviceIndexer.Add(obj) + } + for _, obj := range tc.endpoints { + endpointsIndexer.Add(obj) + } - c := AvailableConditionController{ - apiServiceClient: fakeClient.Apiregistration(), - apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer), - serviceLister: v1listers.NewServiceLister(serviceIndexer), - endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer), - } - c.sync(tc.apiServiceName) + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !tc.forceDiscoveryFail { + w.WriteHeader(http.StatusOK) + } + w.WriteHeader(http.StatusForbidden) + })) + defer testServer.Close() - // ought to have one action writing status - if e, a := 1, len(fakeClient.Actions()); e != a { - t.Errorf("%v expected %v, got %v", tc.name, e, fakeClient.Actions()) - continue - } + c := AvailableConditionController{ + apiServiceClient: fakeClient.Apiregistration(), + apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer), + serviceLister: v1listers.NewServiceLister(serviceIndexer), + endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer), + discoveryClient: testServer.Client(), + serviceResolver: &fakeServiceResolver{url: testServer.URL}, + } + c.sync(tc.apiServiceName) - action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction) - if !ok { - t.Errorf("%v got %v", tc.name, ok) - continue - } + // ought to have one action writing status + if e, a := 1, len(fakeClient.Actions()); e != a { + t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions()) + } - if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a { - t.Errorf("%v expected %v, got %v", tc.name, e, action.GetObject()) - continue - } - condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0] - if e, a := tc.expectedAvailability.Type, condition.Type; e != a { - t.Errorf("%v expected %v, got %#v", tc.name, e, condition) - } - if e, a := tc.expectedAvailability.Status, condition.Status; e != a { - t.Errorf("%v expected %v, got %#v", tc.name, e, condition) - } - if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a { - t.Errorf("%v expected %v, got %#v", tc.name, e, condition) - } - if e, a := tc.expectedAvailability.Message, condition.Message; e != a { - t.Errorf("%v expected %v, got %#v", tc.name, e, condition) - } - if condition.LastTransitionTime.IsZero() { - t.Error("expected lastTransitionTime to be non-zero") - } + action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction) + if !ok { + t.Fatalf("%v got %v", tc.name, ok) + } + + if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a { + t.Fatalf("%v expected %v, got %v", tc.name, e, action.GetObject()) + } + condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0] + if e, a := tc.expectedAvailability.Type, condition.Type; e != a { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if e, a := tc.expectedAvailability.Status, condition.Status; e != a { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if e, a := tc.expectedAvailability.Message, condition.Message; !strings.HasPrefix(a, e) { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if condition.LastTransitionTime.IsZero() { + t.Error("expected lastTransitionTime to be non-zero") + } + }) } } +type fakeServiceResolver struct { + url string +} + +func (f *fakeServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + return url.Parse(f.url) +} + func TestUpdateAPIServiceStatus(t *testing.T) { foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}} bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}