Add an option for aggregator

This commit is contained in:
Di Jin 2022-09-01 15:25:26 -07:00
parent 6e9845f766
commit 0b400cf6aa
7 changed files with 132 additions and 8 deletions

View File

@ -111,10 +111,11 @@ func createAggregatorConfig(
SharedInformerFactory: externalInformers, SharedInformerFactory: externalInformers,
}, },
ExtraConfig: aggregatorapiserver.ExtraConfig{ ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile, ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
ServiceResolver: serviceResolver, ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport, ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
}, },
} }

View File

@ -76,7 +76,8 @@ type ServerRunOptions struct {
ProxyClientCertFile string ProxyClientCertFile string
ProxyClientKeyFile string ProxyClientKeyFile string
EnableAggregatorRouting bool EnableAggregatorRouting bool
AggregatorRejectForwardingRedirects bool
MasterCount int MasterCount int
EndpointReconcilerType string EndpointReconcilerType string
@ -132,7 +133,8 @@ func NewServerRunOptions() *ServerRunOptions {
}, },
HTTPTimeout: time.Duration(5) * time.Second, HTTPTimeout: time.Duration(5) * time.Second,
}, },
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
AggregatorRejectForwardingRedirects: true,
} }
// Overwrite the default for storage data format. // Overwrite the default for storage data format.
@ -244,6 +246,9 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting, fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.") "Turns on aggregator routing requests to endpoints IP rather than cluster IP.")
fs.BoolVar(&s.AggregatorRejectForwardingRedirects, "aggregator-reject-forwarding-redirect", s.AggregatorRejectForwardingRedirects,
"Aggregator reject forwarding redirect response back to client.")
fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+ fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+
"Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.") "Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.")

View File

@ -318,8 +318,9 @@ func TestAddFlags(t *testing.T) {
Traces: &apiserveroptions.TracingOptions{ Traces: &apiserveroptions.TracingOptions{
ConfigFile: "/var/run/kubernetes/tracing_config.yaml", ConfigFile: "/var/run/kubernetes/tracing_config.yaml",
}, },
IdentityLeaseDurationSeconds: 3600, IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10, IdentityLeaseRenewIntervalSeconds: 10,
AggregatorRejectForwardingRedirects: true,
} }
if !reflect.DeepEqual(expected, s) { if !reflect.DeepEqual(expected, s) {

View File

@ -83,6 +83,8 @@ type UpgradeAwareHandler struct {
MaxBytesPerSec int64 MaxBytesPerSec int64
// Responder is passed errors that occur while setting up proxying. // Responder is passed errors that occur while setting up proxying.
Responder ErrorResponder Responder ErrorResponder
// Reject to forward redirect response
RejectForwardingRedirects bool
} }
const defaultFlushInterval = 200 * time.Millisecond const defaultFlushInterval = 200 * time.Millisecond
@ -257,6 +259,31 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
proxy.Transport = h.Transport proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags) proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
if h.RejectForwardingRedirects {
oldModifyResponse := proxy.ModifyResponse
proxy.ModifyResponse = func(response *http.Response) error {
code := response.StatusCode
if code >= 300 && code <= 399 {
// close the original response
response.Body.Close()
msg := "the backend attempted to redirect this request, which is not permitted"
// replace the response
*response = http.Response{
StatusCode: http.StatusBadGateway,
Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
Body: io.NopCloser(strings.NewReader(msg)),
ContentLength: int64(len(msg)),
}
} else {
if oldModifyResponse != nil {
if err := oldModifyResponse(response); err != nil {
return err
}
}
}
return nil
}
}
if h.Responder != nil { if h.Responder != nil {
// if an optional error interceptor/responder was provided wire it // if an optional error interceptor/responder was provided wire it
// the custom responder might be used for providing a unified error reporting // the custom responder might be used for providing a unified error reporting

View File

@ -704,6 +704,83 @@ func TestProxyUpgradeErrorResponse(t *testing.T) {
} }
} }
func TestRejectForwardingRedirectsOption(t *testing.T) {
originalBody := []byte(`some data`)
testCases := []struct {
name string
rejectForwardingRedirects bool
serverStatusCode int
expectStatusCode int
expectBody []byte
}{
{
name: "reject redirection enabled in proxy, backend server sending 200 response",
rejectForwardingRedirects: true,
serverStatusCode: 200,
expectStatusCode: 200,
expectBody: originalBody,
},
{
name: "reject redirection enabled in proxy, backend server sending 301 response",
rejectForwardingRedirects: true,
serverStatusCode: 301,
expectStatusCode: 502,
expectBody: []byte(`the backend attempted to redirect this request, which is not permitted`),
},
{
name: "reject redirection disabled in proxy, backend server sending 200 response",
rejectForwardingRedirects: false,
serverStatusCode: 200,
expectStatusCode: 200,
expectBody: originalBody,
},
{
name: "reject redirection disabled in proxy, backend server sending 301 response",
rejectForwardingRedirects: false,
serverStatusCode: 301,
expectStatusCode: 301,
expectBody: originalBody,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set up a backend server
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.serverStatusCode)
w.Write(originalBody)
}))
defer backendServer.Close()
backendServerURL, _ := url.Parse(backendServer.URL)
// Set up a proxy pointing to the backend
proxyHandler := NewUpgradeAwareHandler(backendServerURL, nil, false, false, &fakeResponder{t: t})
proxyHandler.RejectForwardingRedirects = tc.rejectForwardingRedirects
proxy := httptest.NewServer(proxyHandler)
defer proxy.Close()
proxyURL, _ := url.Parse(proxy.URL)
conn, err := net.Dial("tcp", proxyURL.Host)
require.NoError(t, err)
bufferedReader := bufio.NewReader(conn)
req, _ := http.NewRequest("GET", proxyURL.String(), nil)
require.NoError(t, req.Write(conn))
// Verify we get the correct response and message body content
resp, err := http.ReadResponse(bufferedReader, nil)
require.NoError(t, err)
assert.Equal(t, tc.expectStatusCode, resp.StatusCode)
data, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, tc.expectBody, data)
assert.Equal(t, int64(len(tc.expectBody)), resp.ContentLength)
resp.Body.Close()
// clean up
conn.Close()
})
}
}
func TestDefaultProxyTransport(t *testing.T) { func TestDefaultProxyTransport(t *testing.T) {
tests := []struct { tests := []struct {
name, name,

View File

@ -86,6 +86,8 @@ type ExtraConfig struct {
// Mechanism by which the Aggregator will resolve services. Required. // Mechanism by which the Aggregator will resolve services. Required.
ServiceResolver ServiceResolver ServiceResolver ServiceResolver
RejectForwardingRedirects bool
} }
// Config represents the configuration needed to create an APIAggregator. // Config represents the configuration needed to create an APIAggregator.
@ -155,6 +157,9 @@ type APIAggregator struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver // egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil // overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector egressSelector *egressselector.EgressSelector
// rejectForwardingRedirects is whether to allow to forward redirect response
rejectForwardingRedirects bool
} }
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
@ -212,6 +217,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
openAPIV3Config: c.GenericConfig.OpenAPIV3Config, openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector, egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil }, proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
} }
// used later to filter the served resource by those that have expired. // used later to filter the served resource by those that have expired.
@ -442,6 +448,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyTransport: s.proxyTransport, proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver, serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector, egressSelector: s.egressSelector,
rejectForwardingRedirects: s.rejectForwardingRedirects,
} }
proxyHandler.updateAPIService(apiService) proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil { if s.openAPIAggregationController != nil {

View File

@ -68,6 +68,9 @@ type proxyHandler struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver // egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil // overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector egressSelector *egressselector.EgressSelector
// reject to forward redirect response
rejectForwardingRedirects bool
} }
type proxyHandlingInfo struct { type proxyHandlingInfo struct {
@ -172,6 +175,9 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
if r.rejectForwardingRedirects {
handler.RejectForwardingRedirects = true
}
utilflowcontrol.RequestDelegated(req.Context()) utilflowcontrol.RequestDelegated(req.Context())
handler.ServeHTTP(w, newReq) handler.ServeHTTP(w, newReq)
} }