From 0b400cf6aa281d9cb0a60100a208a1fa113d81c8 Mon Sep 17 00:00:00 2001 From: Di Jin Date: Thu, 1 Sep 2022 15:25:26 -0700 Subject: [PATCH 1/2] Add an option for aggregator --- cmd/kube-apiserver/app/aggregator.go | 9 ++- cmd/kube-apiserver/app/options/options.go | 9 ++- .../app/options/options_test.go | 5 +- .../pkg/util/proxy/upgradeaware.go | 27 +++++++ .../pkg/util/proxy/upgradeaware_test.go | 77 +++++++++++++++++++ .../pkg/apiserver/apiserver.go | 7 ++ .../pkg/apiserver/handler_proxy.go | 6 ++ 7 files changed, 132 insertions(+), 8 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 1ccf17581da..7faf3a63a8a 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -111,10 +111,11 @@ func createAggregatorConfig( SharedInformerFactory: externalInformers, }, ExtraConfig: aggregatorapiserver.ExtraConfig{ - ProxyClientCertFile: commandOptions.ProxyClientCertFile, - ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, - ServiceResolver: serviceResolver, - ProxyTransport: proxyTransport, + ProxyClientCertFile: commandOptions.ProxyClientCertFile, + ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, + ServiceResolver: serviceResolver, + ProxyTransport: proxyTransport, + RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects, }, } diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 97f82fabd31..f84c6465c33 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -76,7 +76,8 @@ type ServerRunOptions struct { ProxyClientCertFile string ProxyClientKeyFile string - EnableAggregatorRouting bool + EnableAggregatorRouting bool + AggregatorRejectForwardingRedirects bool MasterCount int EndpointReconcilerType string @@ -132,7 +133,8 @@ func NewServerRunOptions() *ServerRunOptions { }, HTTPTimeout: time.Duration(5) * time.Second, }, - ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, + ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, + AggregatorRejectForwardingRedirects: true, } // Overwrite the default for storage data format. @@ -244,6 +246,9 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting, "Turns on aggregator routing requests to endpoints IP rather than cluster IP.") + fs.BoolVar(&s.AggregatorRejectForwardingRedirects, "aggregator-reject-forwarding-redirect", s.AggregatorRejectForwardingRedirects, + "Aggregator reject forwarding redirect response back to client.") + fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+ "Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.") diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index c5c7b355d9b..137490ed5f5 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -318,8 +318,9 @@ func TestAddFlags(t *testing.T) { Traces: &apiserveroptions.TracingOptions{ ConfigFile: "/var/run/kubernetes/tracing_config.yaml", }, - IdentityLeaseDurationSeconds: 3600, - IdentityLeaseRenewIntervalSeconds: 10, + IdentityLeaseDurationSeconds: 3600, + IdentityLeaseRenewIntervalSeconds: 10, + AggregatorRejectForwardingRedirects: true, } if !reflect.DeepEqual(expected, s) { diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index f56c17ca3cd..a3a14241cc6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -83,6 +83,8 @@ type UpgradeAwareHandler struct { MaxBytesPerSec int64 // Responder is passed errors that occur while setting up proxying. Responder ErrorResponder + // Reject to forward redirect response + RejectForwardingRedirects bool } const defaultFlushInterval = 200 * time.Millisecond @@ -257,6 +259,31 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request proxy.Transport = h.Transport proxy.FlushInterval = h.FlushInterval proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags) + if h.RejectForwardingRedirects { + oldModifyResponse := proxy.ModifyResponse + proxy.ModifyResponse = func(response *http.Response) error { + code := response.StatusCode + if code >= 300 && code <= 399 { + // close the original response + response.Body.Close() + msg := "the backend attempted to redirect this request, which is not permitted" + // replace the response + *response = http.Response{ + StatusCode: http.StatusBadGateway, + Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)), + Body: io.NopCloser(strings.NewReader(msg)), + ContentLength: int64(len(msg)), + } + } else { + if oldModifyResponse != nil { + if err := oldModifyResponse(response); err != nil { + return err + } + } + } + return nil + } + } if h.Responder != nil { // if an optional error interceptor/responder was provided wire it // the custom responder might be used for providing a unified error reporting diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go index f7fcff7c080..6a3a21d8b58 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go @@ -704,6 +704,83 @@ func TestProxyUpgradeErrorResponse(t *testing.T) { } } +func TestRejectForwardingRedirectsOption(t *testing.T) { + originalBody := []byte(`some data`) + testCases := []struct { + name string + rejectForwardingRedirects bool + serverStatusCode int + expectStatusCode int + expectBody []byte + }{ + { + name: "reject redirection enabled in proxy, backend server sending 200 response", + rejectForwardingRedirects: true, + serverStatusCode: 200, + expectStatusCode: 200, + expectBody: originalBody, + }, + { + name: "reject redirection enabled in proxy, backend server sending 301 response", + rejectForwardingRedirects: true, + serverStatusCode: 301, + expectStatusCode: 502, + expectBody: []byte(`the backend attempted to redirect this request, which is not permitted`), + }, + { + name: "reject redirection disabled in proxy, backend server sending 200 response", + rejectForwardingRedirects: false, + serverStatusCode: 200, + expectStatusCode: 200, + expectBody: originalBody, + }, + { + name: "reject redirection disabled in proxy, backend server sending 301 response", + rejectForwardingRedirects: false, + serverStatusCode: 301, + expectStatusCode: 301, + expectBody: originalBody, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Set up a backend server + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.serverStatusCode) + w.Write(originalBody) + })) + defer backendServer.Close() + backendServerURL, _ := url.Parse(backendServer.URL) + + // Set up a proxy pointing to the backend + proxyHandler := NewUpgradeAwareHandler(backendServerURL, nil, false, false, &fakeResponder{t: t}) + proxyHandler.RejectForwardingRedirects = tc.rejectForwardingRedirects + proxy := httptest.NewServer(proxyHandler) + defer proxy.Close() + proxyURL, _ := url.Parse(proxy.URL) + + conn, err := net.Dial("tcp", proxyURL.Host) + require.NoError(t, err) + bufferedReader := bufio.NewReader(conn) + + req, _ := http.NewRequest("GET", proxyURL.String(), nil) + require.NoError(t, req.Write(conn)) + // Verify we get the correct response and message body content + resp, err := http.ReadResponse(bufferedReader, nil) + require.NoError(t, err) + assert.Equal(t, tc.expectStatusCode, resp.StatusCode) + data, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, tc.expectBody, data) + assert.Equal(t, int64(len(tc.expectBody)), resp.ContentLength) + resp.Body.Close() + + // clean up + conn.Close() + }) + } +} + func TestDefaultProxyTransport(t *testing.T) { tests := []struct { name, diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 116f657ce77..d60f8df9f66 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -86,6 +86,8 @@ type ExtraConfig struct { // Mechanism by which the Aggregator will resolve services. Required. ServiceResolver ServiceResolver + + RejectForwardingRedirects bool } // Config represents the configuration needed to create an APIAggregator. @@ -155,6 +157,9 @@ type APIAggregator struct { // egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector + + // rejectForwardingRedirects is whether to allow to forward redirect response + rejectForwardingRedirects bool } // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. @@ -212,6 +217,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg openAPIV3Config: c.GenericConfig.OpenAPIV3Config, egressSelector: c.GenericConfig.EgressSelector, proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil }, + rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects, } // used later to filter the served resource by those that have expired. @@ -442,6 +448,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { proxyTransport: s.proxyTransport, serviceResolver: s.serviceResolver, egressSelector: s.egressSelector, + rejectForwardingRedirects: s.rejectForwardingRedirects, } proxyHandler.updateAPIService(apiService) if s.openAPIAggregationController != nil { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 3a880b6b2cf..e1282f2ab13 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -68,6 +68,9 @@ type proxyHandler struct { // egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector + + // reject to forward redirect response + rejectForwardingRedirects bool } type proxyHandlingInfo struct { @@ -172,6 +175,9 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) + if r.rejectForwardingRedirects { + handler.RejectForwardingRedirects = true + } utilflowcontrol.RequestDelegated(req.Context()) handler.ServeHTTP(w, newReq) } From 6d78a25374d064546bd902b99d4dc1d3f7d5c7c2 Mon Sep 17 00:00:00 2001 From: Di Jin Date: Tue, 6 Sep 2022 19:28:14 -0700 Subject: [PATCH 2/2] Add integration test Add integration test to ensure aggregator is not forwarding redirect response --- test/integration/examples/apiserver_test.go | 116 ++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 9669ef52de8..1cbba275844 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -22,11 +22,13 @@ import ( "fmt" "net" "net/http" + "net/http/httptest" "net/url" "os" "path" "reflect" "sort" + "strings" "testing" "time" @@ -373,6 +375,120 @@ func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConf return directWardleClientConfig, nil } +func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + t.Cleanup(cancel) + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + if strings.HasSuffix(r.URL.Path, "redirectTarget") { + t.Errorf("backend called unexpectedly") + } + })) + defer backendServer.Close() + + redirectedURL := backendServer.URL + redirectServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "tryRedirect") { + http.Redirect(w, r, redirectedURL+"/redirectTarget", http.StatusMovedPermanently) + } else { + http.Redirect(w, r, redirectedURL, http.StatusMovedPermanently) + } + })) + defer redirectServer.Close() + + // endpoints cannot have loopback IPs so we need to override the resolver itself + t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://%s", redirectServer.Listener.Addr().String())))) + + // start the server after resolver is overwritten + redirectServer.StartTLS() + + testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: false}, nil, framework.SharedEtcd()) + defer testServer.TearDownFn() + kubeClientConfig := rest.CopyConfig(testServer.ClientConfig) + // force json because everything speaks it + kubeClientConfig.ContentType = "" + kubeClientConfig.AcceptContentTypes = "" + kubeClient := client.NewForConfigOrDie(kubeClientConfig) + aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig) + + // create the bare minimum resources required to be able to get the API service into an available state + _, err := kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-redirect", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + _, err = kubeClient.CoreV1().Services("kube-redirect").Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api", + }, + Spec: corev1.ServiceSpec{ + ExternalName: "needs-to-be-non-empty", + Type: corev1.ServiceTypeExternalName, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + _, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.reject.redirect.example.com"}, + Spec: apiregistrationv1.APIServiceSpec{ + Service: &apiregistrationv1.ServiceReference{ + Namespace: "kube-redirect", + Name: "api", + }, + Group: "reject.redirect.example.com", + Version: "v1alpha1", + GroupPriorityMinimum: 200, + VersionPriority: 200, + InsecureSkipTLSVerify: true, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + // wait for the API service to be available + err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) { + apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.reject.redirect.example.com", metav1.GetOptions{}) + if err != nil { + return false, err + } + var available bool + for _, condition := range apiService.Status.Conditions { + if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue { + available = true + break + } + } + if !available { + t.Log("api service is not available", apiService.Status.Conditions) + return false, nil + } + return available, nil + }) + if err != nil { + t.Errorf("%v", err) + } + + // get raw response to check the original error and msg + expectedMsg := "the backend attempted to redirect this request, which is not permitted" + // add specific request path suffix to discriminate between request from client and generic pings from the aggregator + url := url.URL{ + Path: "/apis/reject.redirect.example.com/v1alpha1/tryRedirect", + } + bytes, err := kubeClient.RESTClient().Get().AbsPath(url.String()).DoRaw(context.TODO()) + if err == nil { + t.Errorf("expect server to reject redirect response, but forwarded") + } else if !strings.Contains(string(bytes), expectedMsg) { + t.Errorf("expect response contains %s, got %s", expectedMsg, string(bytes)) + } +} + func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string { // write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config // the loopback client config uses a loopback cert with different SNI. We need to use the "real"