Merge pull request #112193 from jindijamie/master

Add an option for aggregator
This commit is contained in:
Kubernetes Prow Robot 2022-09-08 17:21:24 -07:00 committed by GitHub
commit a7936658ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 248 additions and 8 deletions

View File

@ -115,6 +115,7 @@ func createAggregatorConfig(
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
ServiceResolver: serviceResolver, ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport, ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
}, },
} }

View File

@ -77,6 +77,7 @@ type ServerRunOptions struct {
ProxyClientKeyFile string ProxyClientKeyFile string
EnableAggregatorRouting bool EnableAggregatorRouting bool
AggregatorRejectForwardingRedirects bool
MasterCount int MasterCount int
EndpointReconcilerType string EndpointReconcilerType string
@ -133,6 +134,7 @@ func NewServerRunOptions() *ServerRunOptions {
HTTPTimeout: time.Duration(5) * time.Second, HTTPTimeout: time.Duration(5) * time.Second,
}, },
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
AggregatorRejectForwardingRedirects: true,
} }
// Overwrite the default for storage data format. // Overwrite the default for storage data format.
@ -244,6 +246,9 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting, fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.") "Turns on aggregator routing requests to endpoints IP rather than cluster IP.")
fs.BoolVar(&s.AggregatorRejectForwardingRedirects, "aggregator-reject-forwarding-redirect", s.AggregatorRejectForwardingRedirects,
"Aggregator reject forwarding redirect response back to client.")
fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+ fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+
"Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.") "Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.")

View File

@ -320,6 +320,7 @@ func TestAddFlags(t *testing.T) {
}, },
IdentityLeaseDurationSeconds: 3600, IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10, IdentityLeaseRenewIntervalSeconds: 10,
AggregatorRejectForwardingRedirects: true,
} }
if !reflect.DeepEqual(expected, s) { if !reflect.DeepEqual(expected, s) {

View File

@ -83,6 +83,8 @@ type UpgradeAwareHandler struct {
MaxBytesPerSec int64 MaxBytesPerSec int64
// Responder is passed errors that occur while setting up proxying. // Responder is passed errors that occur while setting up proxying.
Responder ErrorResponder Responder ErrorResponder
// Reject to forward redirect response
RejectForwardingRedirects bool
} }
const defaultFlushInterval = 200 * time.Millisecond const defaultFlushInterval = 200 * time.Millisecond
@ -257,6 +259,31 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
proxy.Transport = h.Transport proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags) proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
if h.RejectForwardingRedirects {
oldModifyResponse := proxy.ModifyResponse
proxy.ModifyResponse = func(response *http.Response) error {
code := response.StatusCode
if code >= 300 && code <= 399 {
// close the original response
response.Body.Close()
msg := "the backend attempted to redirect this request, which is not permitted"
// replace the response
*response = http.Response{
StatusCode: http.StatusBadGateway,
Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
Body: io.NopCloser(strings.NewReader(msg)),
ContentLength: int64(len(msg)),
}
} else {
if oldModifyResponse != nil {
if err := oldModifyResponse(response); err != nil {
return err
}
}
}
return nil
}
}
if h.Responder != nil { if h.Responder != nil {
// if an optional error interceptor/responder was provided wire it // if an optional error interceptor/responder was provided wire it
// the custom responder might be used for providing a unified error reporting // the custom responder might be used for providing a unified error reporting

View File

@ -704,6 +704,83 @@ func TestProxyUpgradeErrorResponse(t *testing.T) {
} }
} }
func TestRejectForwardingRedirectsOption(t *testing.T) {
originalBody := []byte(`some data`)
testCases := []struct {
name string
rejectForwardingRedirects bool
serverStatusCode int
expectStatusCode int
expectBody []byte
}{
{
name: "reject redirection enabled in proxy, backend server sending 200 response",
rejectForwardingRedirects: true,
serverStatusCode: 200,
expectStatusCode: 200,
expectBody: originalBody,
},
{
name: "reject redirection enabled in proxy, backend server sending 301 response",
rejectForwardingRedirects: true,
serverStatusCode: 301,
expectStatusCode: 502,
expectBody: []byte(`the backend attempted to redirect this request, which is not permitted`),
},
{
name: "reject redirection disabled in proxy, backend server sending 200 response",
rejectForwardingRedirects: false,
serverStatusCode: 200,
expectStatusCode: 200,
expectBody: originalBody,
},
{
name: "reject redirection disabled in proxy, backend server sending 301 response",
rejectForwardingRedirects: false,
serverStatusCode: 301,
expectStatusCode: 301,
expectBody: originalBody,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set up a backend server
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.serverStatusCode)
w.Write(originalBody)
}))
defer backendServer.Close()
backendServerURL, _ := url.Parse(backendServer.URL)
// Set up a proxy pointing to the backend
proxyHandler := NewUpgradeAwareHandler(backendServerURL, nil, false, false, &fakeResponder{t: t})
proxyHandler.RejectForwardingRedirects = tc.rejectForwardingRedirects
proxy := httptest.NewServer(proxyHandler)
defer proxy.Close()
proxyURL, _ := url.Parse(proxy.URL)
conn, err := net.Dial("tcp", proxyURL.Host)
require.NoError(t, err)
bufferedReader := bufio.NewReader(conn)
req, _ := http.NewRequest("GET", proxyURL.String(), nil)
require.NoError(t, req.Write(conn))
// Verify we get the correct response and message body content
resp, err := http.ReadResponse(bufferedReader, nil)
require.NoError(t, err)
assert.Equal(t, tc.expectStatusCode, resp.StatusCode)
data, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, tc.expectBody, data)
assert.Equal(t, int64(len(tc.expectBody)), resp.ContentLength)
resp.Body.Close()
// clean up
conn.Close()
})
}
}
func TestDefaultProxyTransport(t *testing.T) { func TestDefaultProxyTransport(t *testing.T) {
tests := []struct { tests := []struct {
name, name,

View File

@ -86,6 +86,8 @@ type ExtraConfig struct {
// Mechanism by which the Aggregator will resolve services. Required. // Mechanism by which the Aggregator will resolve services. Required.
ServiceResolver ServiceResolver ServiceResolver ServiceResolver
RejectForwardingRedirects bool
} }
// Config represents the configuration needed to create an APIAggregator. // Config represents the configuration needed to create an APIAggregator.
@ -155,6 +157,9 @@ type APIAggregator struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver // egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil // overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector egressSelector *egressselector.EgressSelector
// rejectForwardingRedirects is whether to allow to forward redirect response
rejectForwardingRedirects bool
} }
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
@ -212,6 +217,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
openAPIV3Config: c.GenericConfig.OpenAPIV3Config, openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector, egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil }, proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
} }
// used later to filter the served resource by those that have expired. // used later to filter the served resource by those that have expired.
@ -442,6 +448,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyTransport: s.proxyTransport, proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver, serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector, egressSelector: s.egressSelector,
rejectForwardingRedirects: s.rejectForwardingRedirects,
} }
proxyHandler.updateAPIService(apiService) proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil { if s.openAPIAggregationController != nil {

View File

@ -68,6 +68,9 @@ type proxyHandler struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver // egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil // overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector egressSelector *egressselector.EgressSelector
// reject to forward redirect response
rejectForwardingRedirects bool
} }
type proxyHandlingInfo struct { type proxyHandlingInfo struct {
@ -172,6 +175,9 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
if r.rejectForwardingRedirects {
handler.RejectForwardingRedirects = true
}
utilflowcontrol.RequestDelegated(req.Context()) utilflowcontrol.RequestDelegated(req.Context())
handler.ServeHTTP(w, newReq) handler.ServeHTTP(w, newReq)
} }

View File

@ -22,11 +22,13 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"net/http/httptest"
"net/url" "net/url"
"os" "os"
"path" "path"
"reflect" "reflect"
"sort" "sort"
"strings"
"testing" "testing"
"time" "time"
@ -373,6 +375,120 @@ func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConf
return directWardleClientConfig, nil return directWardleClientConfig, nil
} }
func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if strings.HasSuffix(r.URL.Path, "redirectTarget") {
t.Errorf("backend called unexpectedly")
}
}))
defer backendServer.Close()
redirectedURL := backendServer.URL
redirectServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "tryRedirect") {
http.Redirect(w, r, redirectedURL+"/redirectTarget", http.StatusMovedPermanently)
} else {
http.Redirect(w, r, redirectedURL, http.StatusMovedPermanently)
}
}))
defer redirectServer.Close()
// endpoints cannot have loopback IPs so we need to override the resolver itself
t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://%s", redirectServer.Listener.Addr().String()))))
// start the server after resolver is overwritten
redirectServer.StartTLS()
testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: false}, nil, framework.SharedEtcd())
defer testServer.TearDownFn()
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
// force json because everything speaks it
kubeClientConfig.ContentType = ""
kubeClientConfig.AcceptContentTypes = ""
kubeClient := client.NewForConfigOrDie(kubeClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
// create the bare minimum resources required to be able to get the API service into an available state
_, err := kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-redirect",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = kubeClient.CoreV1().Services("kube-redirect").Create(ctx, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "api",
},
Spec: corev1.ServiceSpec{
ExternalName: "needs-to-be-non-empty",
Type: corev1.ServiceTypeExternalName,
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.reject.redirect.example.com"},
Spec: apiregistrationv1.APIServiceSpec{
Service: &apiregistrationv1.ServiceReference{
Namespace: "kube-redirect",
Name: "api",
},
Group: "reject.redirect.example.com",
Version: "v1alpha1",
GroupPriorityMinimum: 200,
VersionPriority: 200,
InsecureSkipTLSVerify: true,
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
// wait for the API service to be available
err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) {
apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.reject.redirect.example.com", metav1.GetOptions{})
if err != nil {
return false, err
}
var available bool
for _, condition := range apiService.Status.Conditions {
if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue {
available = true
break
}
}
if !available {
t.Log("api service is not available", apiService.Status.Conditions)
return false, nil
}
return available, nil
})
if err != nil {
t.Errorf("%v", err)
}
// get raw response to check the original error and msg
expectedMsg := "the backend attempted to redirect this request, which is not permitted"
// add specific request path suffix to discriminate between request from client and generic pings from the aggregator
url := url.URL{
Path: "/apis/reject.redirect.example.com/v1alpha1/tryRedirect",
}
bytes, err := kubeClient.RESTClient().Get().AbsPath(url.String()).DoRaw(context.TODO())
if err == nil {
t.Errorf("expect server to reject redirect response, but forwarded")
} else if !strings.Contains(string(bytes), expectedMsg) {
t.Errorf("expect response contains %s, got %s", expectedMsg, string(bytes))
}
}
func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string { func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string {
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config // write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
// the loopback client config uses a loopback cert with different SNI. We need to use the "real" // the loopback client config uses a loopback cert with different SNI. We need to use the "real"