From 601b7d33a9cf0b724cdabb5de81b0bf2821f0fca Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 28 Aug 2019 13:27:38 -0400 Subject: [PATCH 1/3] Make webhook benchmarks parallel --- .../pkg/admission/plugin/webhook/mutating/plugin_test.go | 8 +++++--- .../admission/plugin/webhook/validating/plugin_test.go | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin_test.go index ff48879ec3b..a7f5bd1d96f 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin_test.go @@ -96,9 +96,11 @@ func BenchmarkAdmit(b *testing.B) { } b.ResetTimer() - for i := 0; i < b.N; i++ { - wh.Admit(context.TODO(), attr, objectInterfaces) - } + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + wh.Admit(context.TODO(), attr, objectInterfaces) + } + }) }) } } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin_test.go index fc7a604e17a..a975962c679 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin_test.go @@ -85,9 +85,11 @@ func BenchmarkValidate(b *testing.B) { attr := webhooktesting.NewAttribute(ns, nil, tt.IsDryRun) b.ResetTimer() - for i := 0; i < b.N; i++ { - wh.Validate(context.TODO(), attr, objectInterfaces) - } + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + wh.Validate(context.TODO(), attr, objectInterfaces) + } + }) }) } } From aef05c8dca2c1a9967ebd9a2f67a0bf7fb16f079 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 28 Aug 2019 09:55:37 -0400 Subject: [PATCH 2/3] Plumb NextProtos to TLS client config, honor http/2 client preference --- pkg/kubelet/certificate/transport.go | 1 + pkg/kubelet/client/kubelet_client.go | 13 ++--- .../k8s.io/apimachinery/pkg/util/net/http.go | 20 ++++++- .../apimachinery/pkg/util/net/http_test.go | 53 +++++++++++++++++++ staging/src/k8s.io/client-go/rest/config.go | 9 ++++ .../src/k8s.io/client-go/rest/config_test.go | 11 ++-- .../src/k8s.io/client-go/rest/transport.go | 1 + .../client-go/rest/zz_generated.deepcopy.go | 5 ++ .../src/k8s.io/client-go/transport/cache.go | 3 ++ .../k8s.io/client-go/transport/cache_test.go | 2 + .../src/k8s.io/client-go/transport/config.go | 6 +++ .../k8s.io/client-go/transport/transport.go | 3 +- 12 files changed, 114 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/certificate/transport.go b/pkg/kubelet/certificate/transport.go index 442f4a8a379..261c4a53608 100644 --- a/pkg/kubelet/certificate/transport.go +++ b/pkg/kubelet/certificate/transport.go @@ -153,6 +153,7 @@ func addCertRotation(stopCh <-chan struct{}, period time.Duration, clientConfig clientConfig.CAData = nil clientConfig.CAFile = "" clientConfig.Insecure = false + clientConfig.NextProtos = nil return nil } diff --git a/pkg/kubelet/client/kubelet_client.go b/pkg/kubelet/client/kubelet_client.go index a1cacc990ab..3cb0cf96072 100644 --- a/pkg/kubelet/client/kubelet_client.go +++ b/pkg/kubelet/client/kubelet_client.go @@ -109,12 +109,13 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) { func (c *KubeletClientConfig) transportConfig() *transport.Config { cfg := &transport.Config{ TLS: transport.TLSConfig{ - CAFile: c.CAFile, - CAData: c.CAData, - CertFile: c.CertFile, - CertData: c.CertData, - KeyFile: c.KeyFile, - KeyData: c.KeyData, + CAFile: c.CAFile, + CAData: c.CAData, + CertFile: c.CertFile, + CertData: c.CertData, + KeyFile: c.KeyFile, + KeyData: c.KeyData, + NextProtos: c.NextProtos, }, BearerToken: c.BearerToken, } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index 078f00d9b97..f9540c63bb2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -101,6 +101,9 @@ func SetOldTransportDefaults(t *http.Transport) *http.Transport { if t.TLSHandshakeTimeout == 0 { t.TLSHandshakeTimeout = defaultTransport.TLSHandshakeTimeout } + if t.IdleConnTimeout == 0 { + t.IdleConnTimeout = defaultTransport.IdleConnTimeout + } return t } @@ -111,7 +114,7 @@ func SetTransportDefaults(t *http.Transport) *http.Transport { // Allow clients to disable http2 if needed. if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 { klog.Infof("HTTP2 has been explicitly disabled") - } else { + } else if allowsHTTP2(t) { if err := http2.ConfigureTransport(t); err != nil { klog.Warningf("Transport failed http2 configuration: %v", err) } @@ -119,6 +122,21 @@ func SetTransportDefaults(t *http.Transport) *http.Transport { return t } +func allowsHTTP2(t *http.Transport) bool { + if t.TLSClientConfig == nil || len(t.TLSClientConfig.NextProtos) == 0 { + // the transport expressed no NextProto preference, allow + return true + } + for _, p := range t.TLSClientConfig.NextProtos { + if p == http2.NextProtoTLS { + // the transport explicitly allowed http/2 + return true + } + } + // the transport explicitly set NextProtos and excluded http/2 + return false +} + type RoundTripperWrapper interface { http.RoundTripper WrappedRoundTripper() http.RoundTripper diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go index ffe8f17ef7d..4e4e317b9a4 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go @@ -439,3 +439,56 @@ func TestConnectWithRedirects(t *testing.T) { }) } } + +func TestAllowsHTTP2(t *testing.T) { + testcases := []struct { + Name string + Transport *http.Transport + ExpectAllows bool + }{ + { + Name: "empty", + Transport: &http.Transport{}, + ExpectAllows: true, + }, + { + Name: "empty tlsconfig", + Transport: &http.Transport{TLSClientConfig: &tls.Config{}}, + ExpectAllows: true, + }, + { + Name: "zero-length NextProtos", + Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{}}}, + ExpectAllows: true, + }, + { + Name: "includes h2 in NextProtos after", + Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"http/1.1", "h2"}}}, + ExpectAllows: true, + }, + { + Name: "includes h2 in NextProtos before", + Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"h2", "http/1.1"}}}, + ExpectAllows: true, + }, + { + Name: "includes h2 in NextProtos between", + Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"http/1.1", "h2", "h3"}}}, + ExpectAllows: true, + }, + { + Name: "excludes h2 in NextProtos", + Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"http/1.1"}}}, + ExpectAllows: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.Name, func(t *testing.T) { + allows := allowsHTTP2(tc.Transport) + if allows != tc.ExpectAllows { + t.Errorf("expected %v, got %v", tc.ExpectAllows, allows) + } + }) + } +} diff --git a/staging/src/k8s.io/client-go/rest/config.go b/staging/src/k8s.io/client-go/rest/config.go index e25994721d1..fb81fb7b1cf 100644 --- a/staging/src/k8s.io/client-go/rest/config.go +++ b/staging/src/k8s.io/client-go/rest/config.go @@ -211,6 +211,12 @@ type TLSClientConfig struct { // CAData holds PEM-encoded bytes (typically read from a root certificates bundle). // CAData takes precedence over CAFile CAData []byte + + // NextProtos is a list of supported application level protocols, in order of preference. + // Used to populate tls.Config.NextProtos. + // To indicate to the server http/1.1 is preferred over http/2, set to ["http/1.1", "h2"] (though the server is free to ignore that preference). + // To use only http/1.1, set to ["http/1.1"]. + NextProtos []string } var _ fmt.Stringer = TLSClientConfig{} @@ -236,6 +242,7 @@ func (c TLSClientConfig) String() string { CertData: c.CertData, KeyData: c.KeyData, CAData: c.CAData, + NextProtos: c.NextProtos, } // Explicitly mark non-empty credential fields as redacted. if len(cc.CertData) != 0 { @@ -503,6 +510,7 @@ func AnonymousClientConfig(config *Config) *Config { ServerName: config.ServerName, CAFile: config.TLSClientConfig.CAFile, CAData: config.TLSClientConfig.CAData, + NextProtos: config.TLSClientConfig.NextProtos, }, RateLimiter: config.RateLimiter, UserAgent: config.UserAgent, @@ -541,6 +549,7 @@ func CopyConfig(config *Config) *Config { CertData: config.TLSClientConfig.CertData, KeyData: config.TLSClientConfig.KeyData, CAData: config.TLSClientConfig.CAData, + NextProtos: config.TLSClientConfig.NextProtos, }, UserAgent: config.UserAgent, DisableCompression: config.DisableCompression, diff --git a/staging/src/k8s.io/client-go/rest/config_test.go b/staging/src/k8s.io/client-go/rest/config_test.go index f4ba5f00e90..df5778ce5c5 100644 --- a/staging/src/k8s.io/client-go/rest/config_test.go +++ b/staging/src/k8s.io/client-go/rest/config_test.go @@ -493,10 +493,11 @@ func TestConfigSprint(t *testing.T) { Env: []clientcmdapi.ExecEnvVar{{Name: "secret", Value: "s3cr3t"}}, }, TLSClientConfig: TLSClientConfig{ - CertFile: "a.crt", - KeyFile: "a.key", - CertData: []byte("fake cert"), - KeyData: []byte("fake key"), + CertFile: "a.crt", + KeyFile: "a.key", + CertData: []byte("fake cert"), + KeyData: []byte("fake key"), + NextProtos: []string{"h2", "http/1.1"}, }, UserAgent: "gobot", Transport: &fakeRoundTripper{}, @@ -508,7 +509,7 @@ func TestConfigSprint(t *testing.T) { Dial: fakeDialFunc, } want := fmt.Sprintf( - `&rest.Config{Host:"localhost:8080", APIPath:"v1", ContentConfig:rest.ContentConfig{AcceptContentTypes:"application/json", ContentType:"application/json", GroupVersion:(*schema.GroupVersion)(nil), NegotiatedSerializer:runtime.NegotiatedSerializer(nil)}, Username:"gopher", Password:"--- REDACTED ---", BearerToken:"--- REDACTED ---", BearerTokenFile:"", Impersonate:rest.ImpersonationConfig{UserName:"gopher2", Groups:[]string(nil), Extra:map[string][]string(nil)}, AuthProvider:api.AuthProviderConfig{Name: "gopher", Config: map[string]string{--- REDACTED ---}}, AuthConfigPersister:rest.AuthProviderConfigPersister(--- REDACTED ---), ExecProvider:api.AuthProviderConfig{Command: "sudo", Args: []string{"--- REDACTED ---"}, Env: []ExecEnvVar{--- REDACTED ---}, APIVersion: ""}, TLSClientConfig:rest.sanitizedTLSClientConfig{Insecure:false, ServerName:"", CertFile:"a.crt", KeyFile:"a.key", CAFile:"", CertData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, KeyData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x52, 0x45, 0x44, 0x41, 0x43, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, CAData:[]uint8(nil)}, UserAgent:"gobot", DisableCompression:false, Transport:(*rest.fakeRoundTripper)(%p), WrapTransport:(transport.WrapperFunc)(%p), QPS:1, Burst:2, RateLimiter:(*rest.fakeLimiter)(%p), Timeout:3000000000, Dial:(func(context.Context, string, string) (net.Conn, error))(%p)}`, + `&rest.Config{Host:"localhost:8080", APIPath:"v1", ContentConfig:rest.ContentConfig{AcceptContentTypes:"application/json", ContentType:"application/json", GroupVersion:(*schema.GroupVersion)(nil), NegotiatedSerializer:runtime.NegotiatedSerializer(nil)}, Username:"gopher", Password:"--- REDACTED ---", BearerToken:"--- REDACTED ---", BearerTokenFile:"", Impersonate:rest.ImpersonationConfig{UserName:"gopher2", Groups:[]string(nil), Extra:map[string][]string(nil)}, AuthProvider:api.AuthProviderConfig{Name: "gopher", Config: map[string]string{--- REDACTED ---}}, AuthConfigPersister:rest.AuthProviderConfigPersister(--- REDACTED ---), ExecProvider:api.AuthProviderConfig{Command: "sudo", Args: []string{"--- REDACTED ---"}, Env: []ExecEnvVar{--- REDACTED ---}, APIVersion: ""}, TLSClientConfig:rest.sanitizedTLSClientConfig{Insecure:false, ServerName:"", CertFile:"a.crt", KeyFile:"a.key", CAFile:"", CertData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, KeyData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x52, 0x45, 0x44, 0x41, 0x43, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, CAData:[]uint8(nil), NextProtos:[]string{"h2", "http/1.1"}}, UserAgent:"gobot", DisableCompression:false, Transport:(*rest.fakeRoundTripper)(%p), WrapTransport:(transport.WrapperFunc)(%p), QPS:1, Burst:2, RateLimiter:(*rest.fakeLimiter)(%p), Timeout:3000000000, Dial:(func(context.Context, string, string) (net.Conn, error))(%p)}`, c.Transport, fakeWrapperFunc, c.RateLimiter, fakeDialFunc, ) diff --git a/staging/src/k8s.io/client-go/rest/transport.go b/staging/src/k8s.io/client-go/rest/transport.go index 81b9dfde33a..0800e4ec747 100644 --- a/staging/src/k8s.io/client-go/rest/transport.go +++ b/staging/src/k8s.io/client-go/rest/transport.go @@ -74,6 +74,7 @@ func (c *Config) TransportConfig() (*transport.Config, error) { CertData: c.CertData, KeyFile: c.KeyFile, KeyData: c.KeyData, + NextProtos: c.NextProtos, }, Username: c.Username, Password: c.Password, diff --git a/staging/src/k8s.io/client-go/rest/zz_generated.deepcopy.go b/staging/src/k8s.io/client-go/rest/zz_generated.deepcopy.go index c1ab45f337e..da4a1624e81 100644 --- a/staging/src/k8s.io/client-go/rest/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/client-go/rest/zz_generated.deepcopy.go @@ -38,6 +38,11 @@ func (in *TLSClientConfig) DeepCopyInto(out *TLSClientConfig) { *out = make([]byte, len(*in)) copy(*out, *in) } + if in.NextProtos != nil { + in, out := &in.NextProtos, &out.NextProtos + *out = make([]string, len(*in)) + copy(*out, *in) + } return } diff --git a/staging/src/k8s.io/client-go/transport/cache.go b/staging/src/k8s.io/client-go/transport/cache.go index 12c669037c4..980d36ae128 100644 --- a/staging/src/k8s.io/client-go/transport/cache.go +++ b/staging/src/k8s.io/client-go/transport/cache.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "net/http" + "strings" "sync" "time" @@ -45,6 +46,7 @@ type tlsCacheKey struct { keyData string getCert string serverName string + nextProtos string dial string disableCompression bool } @@ -114,6 +116,7 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) { keyData: string(c.TLS.KeyData), getCert: fmt.Sprintf("%p", c.TLS.GetCert), serverName: c.TLS.ServerName, + nextProtos: strings.Join(c.TLS.NextProtos, ","), dial: fmt.Sprintf("%p", c.Dial), disableCompression: c.DisableCompression, }, nil diff --git a/staging/src/k8s.io/client-go/transport/cache_test.go b/staging/src/k8s.io/client-go/transport/cache_test.go index 9b740cdeca8..8b9779e68d8 100644 --- a/staging/src/k8s.io/client-go/transport/cache_test.go +++ b/staging/src/k8s.io/client-go/transport/cache_test.go @@ -126,6 +126,8 @@ func TestTLSConfigKey(t *testing.T) { GetCert: getCert, }, }, + "http2, http1.1": {TLS: TLSConfig{NextProtos: []string{"h2", "http/1.1"}}}, + "http1.1-only": {TLS: TLSConfig{NextProtos: []string{"http/1.1"}}}, } for nameA, valueA := range uniqueConfigurations { for nameB, valueB := range uniqueConfigurations { diff --git a/staging/src/k8s.io/client-go/transport/config.go b/staging/src/k8s.io/client-go/transport/config.go index 8a73589e177..9e18d11d38a 100644 --- a/staging/src/k8s.io/client-go/transport/config.go +++ b/staging/src/k8s.io/client-go/transport/config.go @@ -126,5 +126,11 @@ type TLSConfig struct { CertData []byte // Bytes of the PEM-encoded client certificate. Supercedes CertFile. KeyData []byte // Bytes of the PEM-encoded client key. Supercedes KeyFile. + // NextProtos is a list of supported application level protocols, in order of preference. + // Used to populate tls.Config.NextProtos. + // To indicate to the server http/1.1 is preferred over http/2, set to ["http/1.1", "h2"] (though the server is free to ignore that preference). + // To use only http/1.1, set to ["http/1.1"]. + NextProtos []string + GetCert func() (*tls.Certificate, error) // Callback that returns a TLS client certificate. CertData, CertFile, KeyData and KeyFile supercede this field. } diff --git a/staging/src/k8s.io/client-go/transport/transport.go b/staging/src/k8s.io/client-go/transport/transport.go index 1815c11ff32..cd8de982859 100644 --- a/staging/src/k8s.io/client-go/transport/transport.go +++ b/staging/src/k8s.io/client-go/transport/transport.go @@ -56,7 +56,7 @@ func New(config *Config) (http.RoundTripper, error) { // TLSConfigFor returns a tls.Config that will provide the transport level security defined // by the provided Config. Will return nil if no transport level security is requested. func TLSConfigFor(c *Config) (*tls.Config, error) { - if !(c.HasCA() || c.HasCertAuth() || c.HasCertCallback() || c.TLS.Insecure || len(c.TLS.ServerName) > 0) { + if !(c.HasCA() || c.HasCertAuth() || c.HasCertCallback() || c.TLS.Insecure || len(c.TLS.ServerName) > 0 || len(c.TLS.NextProtos) > 0) { return nil, nil } if c.HasCA() && c.TLS.Insecure { @@ -73,6 +73,7 @@ func TLSConfigFor(c *Config) (*tls.Config, error) { MinVersion: tls.VersionTLS12, InsecureSkipVerify: c.TLS.Insecure, ServerName: c.TLS.ServerName, + NextProtos: c.TLS.NextProtos, } if c.HasCA() { From ddc697866afd1e58cd2ee504277b405052546202 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 28 Aug 2019 13:37:50 -0400 Subject: [PATCH 3/3] Use http/1.1 in apiserver->webhook clients --- .../apiserver/pkg/util/webhook/client.go | 5 + .../apiserver/admissionwebhook/BUILD | 1 + .../admissionwebhook/load_balance_test.go | 314 ++++++++++++++++++ 3 files changed, 320 insertions(+) create mode 100644 test/integration/apiserver/admissionwebhook/load_balance_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go index fd6a83c1c01..32d8fe28960 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go @@ -136,6 +136,11 @@ func (cm *ClientManager) HookClient(cc ClientConfig) (*rest.RESTClient, error) { } cfg.TLSClientConfig.CAData = append(cfg.TLSClientConfig.CAData, cc.CABundle...) + // Use http/1.1 instead of http/2. + // This is a workaround for http/2-enabled clients not load-balancing concurrent requests to multiple backends. + // See http://issue.k8s.io/75791 for details. + cfg.NextProtos = []string{"http/1.1"} + cfg.ContentConfig.NegotiatedSerializer = cm.negotiatedSerializer cfg.ContentConfig.ContentType = runtime.ContentTypeJSON client, err := rest.UnversionedRESTClientFor(cfg) diff --git a/test/integration/apiserver/admissionwebhook/BUILD b/test/integration/apiserver/admissionwebhook/BUILD index d7a18714f19..206837f6451 100644 --- a/test/integration/apiserver/admissionwebhook/BUILD +++ b/test/integration/apiserver/admissionwebhook/BUILD @@ -5,6 +5,7 @@ go_test( srcs = [ "admission_test.go", "broken_webhook_test.go", + "load_balance_test.go", "main_test.go", "reinvocation_test.go", "timeout_test.go", diff --git a/test/integration/apiserver/admissionwebhook/load_balance_test.go b/test/integration/apiserver/admissionwebhook/load_balance_test.go new file mode 100644 index 00000000000..da988a98c80 --- /dev/null +++ b/test/integration/apiserver/admissionwebhook/load_balance_test.go @@ -0,0 +1,314 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admissionwebhook + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/api/admission/v1beta1" + admissionv1beta1 "k8s.io/api/admissionregistration/v1beta1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + testLoadBalanceClientUsername = "webhook-balance-integration-client" +) + +// TestWebhookLoadBalance ensures that the admission webhook opens multiple connections to backends to satisfy concurrent requests +func TestWebhookLoadBalance(t *testing.T) { + + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(localhostCert) { + t.Fatal("Failed to append Cert from PEM") + } + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("Failed to build cert with error: %+v", err) + } + + localListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + if localListener, err = net.Listen("tcp6", "[::1]:0"); err != nil { + t.Fatal(err) + } + } + trackingListener := &connectionTrackingListener{delegate: localListener} + + recorder := &connectionRecorder{} + handler := newLoadBalanceWebhookHandler(recorder) + httpServer := &http.Server{ + Handler: handler, + TLSConfig: &tls.Config{ + RootCAs: roots, + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + httpServer.ServeTLS(trackingListener, "", "") + }() + defer httpServer.Close() + + webhookURL := "https://" + localListener.Addr().String() + + s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{ + "--disable-admission-plugins=ServiceAccount", + }, framework.SharedEtcd()) + defer s.TearDownFn() + + // Configure a client with a distinct user name so that it is easy to distinguish requests + // made by the client from requests made by controllers. We use this to filter out requests + // before recording them to ensure we don't accidentally mistake requests from controllers + // as requests made by the client. + clientConfig := rest.CopyConfig(s.ClientConfig) + clientConfig.QPS = 100 + clientConfig.Burst = 200 + clientConfig.Impersonate.UserName = testLoadBalanceClientUsername + clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"} + client, err := clientset.NewForConfig(clientConfig) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = client.CoreV1().Pods("default").Create(loadBalanceMarkerFixture) + if err != nil { + t.Fatal(err) + } + + upCh := recorder.Reset() + ns := "load-balance" + _, err = client.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}) + if err != nil { + t.Fatal(err) + } + + fail := admissionv1beta1.Fail + mutatingCfg, err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Create(&admissionv1beta1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "admission.integration.test"}, + Webhooks: []admissionv1beta1.MutatingWebhook{{ + Name: "admission.integration.test", + ClientConfig: admissionv1beta1.WebhookClientConfig{ + URL: &webhookURL, + CABundle: localhostCert, + }, + Rules: []admissionv1beta1.RuleWithOperations{{ + Operations: []admissionv1beta1.OperationType{admissionv1beta1.OperationAll}, + Rule: admissionv1beta1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"pods"}}, + }}, + FailurePolicy: &fail, + AdmissionReviewVersions: []string{"v1beta1"}, + }}, + }) + if err != nil { + t.Fatal(err) + } + defer func() { + err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Delete(mutatingCfg.GetName(), &metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + }() + + // wait until new webhook is called the first time + if err := wait.PollImmediate(time.Millisecond*5, wait.ForeverTestTimeout, func() (bool, error) { + _, err = client.CoreV1().Pods("default").Patch(loadBalanceMarkerFixture.Name, types.JSONPatchType, []byte("[]")) + select { + case <-upCh: + return true, nil + default: + t.Logf("Waiting for webhook to become effective, getting marker object: %v", err) + return false, nil + } + }); err != nil { + t.Fatal(err) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + GenerateName: "loadbalance-", + }, + Spec: corev1.PodSpec{ + Containers: []v1.Container{{ + Name: "fake-name", + Image: "fakeimage", + }}, + }, + } + + // Submit 10 parallel requests + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := client.CoreV1().Pods(ns).Create(pod) + if err != nil { + t.Error(err) + } + }() + } + wg.Wait() + + if actual := atomic.LoadInt64(&trackingListener.connections); actual < 10 { + t.Errorf("expected at least 10 connections, got %d", actual) + } + trackingListener.Reset() + + // Submit 10 more parallel requests + wg = &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := client.CoreV1().Pods(ns).Create(pod) + if err != nil { + t.Error(err) + } + }() + } + wg.Wait() + + if actual := atomic.LoadInt64(&trackingListener.connections); actual > 0 { + t.Errorf("expected no additional connections (reusing kept-alive connections), got %d", actual) + } +} + +type connectionRecorder struct { + mu sync.Mutex + upCh chan struct{} + upOnce sync.Once +} + +// Reset zeros out all counts and returns a channel that is closed when the first admission of the +// marker object is received. +func (i *connectionRecorder) Reset() chan struct{} { + i.mu.Lock() + defer i.mu.Unlock() + i.upCh = make(chan struct{}) + i.upOnce = sync.Once{} + return i.upCh +} + +func (i *connectionRecorder) MarkerReceived() { + i.mu.Lock() + defer i.mu.Unlock() + i.upOnce.Do(func() { + close(i.upCh) + }) +} + +func newLoadBalanceWebhookHandler(recorder *connectionRecorder) http.Handler { + allow := func(w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{ + Response: &v1beta1.AdmissionResponse{ + Allowed: true, + }, + }) + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Println(r.Proto) + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), 400) + } + review := v1beta1.AdmissionReview{} + if err := json.Unmarshal(data, &review); err != nil { + http.Error(w, err.Error(), 400) + } + if review.Request.UserInfo.Username != testLoadBalanceClientUsername { + // skip requests not originating from this integration test's client + allow(w) + return + } + + if len(review.Request.Object.Raw) == 0 { + http.Error(w, err.Error(), 400) + } + pod := &corev1.Pod{} + if err := json.Unmarshal(review.Request.Object.Raw, pod); err != nil { + http.Error(w, err.Error(), 400) + } + + // When resetting between tests, a marker object is patched until this webhook + // observes it, at which point it is considered ready. + if pod.Namespace == loadBalanceMarkerFixture.Namespace && pod.Name == loadBalanceMarkerFixture.Name { + recorder.MarkerReceived() + allow(w) + return + } + + // simulate a loaded backend + time.Sleep(2 * time.Second) + allow(w) + }) +} + +var loadBalanceMarkerFixture = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "marker", + }, + Spec: corev1.PodSpec{ + Containers: []v1.Container{{ + Name: "fake-name", + Image: "fakeimage", + }}, + }, +} + +type connectionTrackingListener struct { + connections int64 + delegate net.Listener +} + +func (c *connectionTrackingListener) Reset() { + atomic.StoreInt64(&c.connections, 0) +} + +func (c *connectionTrackingListener) Accept() (net.Conn, error) { + conn, err := c.delegate.Accept() + if err == nil { + atomic.AddInt64(&c.connections, 1) + } + return conn, err +} +func (c *connectionTrackingListener) Close() error { + return c.delegate.Close() +} +func (c *connectionTrackingListener) Addr() net.Addr { + return c.delegate.Addr() +}