mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #82090 from liggitt/webhook-http2
Use http/1.1 for apiserver->webhook clients
This commit is contained in:
commit
f442b6ef32
@ -153,6 +153,7 @@ func addCertRotation(stopCh <-chan struct{}, period time.Duration, clientConfig
|
||||
clientConfig.CAData = nil
|
||||
clientConfig.CAFile = ""
|
||||
clientConfig.Insecure = false
|
||||
clientConfig.NextProtos = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -109,12 +109,13 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
|
||||
func (c *KubeletClientConfig) transportConfig() *transport.Config {
|
||||
cfg := &transport.Config{
|
||||
TLS: transport.TLSConfig{
|
||||
CAFile: c.CAFile,
|
||||
CAData: c.CAData,
|
||||
CertFile: c.CertFile,
|
||||
CertData: c.CertData,
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
CAFile: c.CAFile,
|
||||
CAData: c.CAData,
|
||||
CertFile: c.CertFile,
|
||||
CertData: c.CertData,
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
NextProtos: c.NextProtos,
|
||||
},
|
||||
BearerToken: c.BearerToken,
|
||||
}
|
||||
|
@ -101,6 +101,9 @@ func SetOldTransportDefaults(t *http.Transport) *http.Transport {
|
||||
if t.TLSHandshakeTimeout == 0 {
|
||||
t.TLSHandshakeTimeout = defaultTransport.TLSHandshakeTimeout
|
||||
}
|
||||
if t.IdleConnTimeout == 0 {
|
||||
t.IdleConnTimeout = defaultTransport.IdleConnTimeout
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
@ -111,7 +114,7 @@ func SetTransportDefaults(t *http.Transport) *http.Transport {
|
||||
// Allow clients to disable http2 if needed.
|
||||
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
|
||||
klog.Infof("HTTP2 has been explicitly disabled")
|
||||
} else {
|
||||
} else if allowsHTTP2(t) {
|
||||
if err := http2.ConfigureTransport(t); err != nil {
|
||||
klog.Warningf("Transport failed http2 configuration: %v", err)
|
||||
}
|
||||
@ -119,6 +122,21 @@ func SetTransportDefaults(t *http.Transport) *http.Transport {
|
||||
return t
|
||||
}
|
||||
|
||||
func allowsHTTP2(t *http.Transport) bool {
|
||||
if t.TLSClientConfig == nil || len(t.TLSClientConfig.NextProtos) == 0 {
|
||||
// the transport expressed no NextProto preference, allow
|
||||
return true
|
||||
}
|
||||
for _, p := range t.TLSClientConfig.NextProtos {
|
||||
if p == http2.NextProtoTLS {
|
||||
// the transport explicitly allowed http/2
|
||||
return true
|
||||
}
|
||||
}
|
||||
// the transport explicitly set NextProtos and excluded http/2
|
||||
return false
|
||||
}
|
||||
|
||||
type RoundTripperWrapper interface {
|
||||
http.RoundTripper
|
||||
WrappedRoundTripper() http.RoundTripper
|
||||
|
@ -439,3 +439,56 @@ func TestConnectWithRedirects(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllowsHTTP2(t *testing.T) {
|
||||
testcases := []struct {
|
||||
Name string
|
||||
Transport *http.Transport
|
||||
ExpectAllows bool
|
||||
}{
|
||||
{
|
||||
Name: "empty",
|
||||
Transport: &http.Transport{},
|
||||
ExpectAllows: true,
|
||||
},
|
||||
{
|
||||
Name: "empty tlsconfig",
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{}},
|
||||
ExpectAllows: true,
|
||||
},
|
||||
{
|
||||
Name: "zero-length NextProtos",
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{}}},
|
||||
ExpectAllows: true,
|
||||
},
|
||||
{
|
||||
Name: "includes h2 in NextProtos after",
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"http/1.1", "h2"}}},
|
||||
ExpectAllows: true,
|
||||
},
|
||||
{
|
||||
Name: "includes h2 in NextProtos before",
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"h2", "http/1.1"}}},
|
||||
ExpectAllows: true,
|
||||
},
|
||||
{
|
||||
Name: "includes h2 in NextProtos between",
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"http/1.1", "h2", "h3"}}},
|
||||
ExpectAllows: true,
|
||||
},
|
||||
{
|
||||
Name: "excludes h2 in NextProtos",
|
||||
Transport: &http.Transport{TLSClientConfig: &tls.Config{NextProtos: []string{"http/1.1"}}},
|
||||
ExpectAllows: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
allows := allowsHTTP2(tc.Transport)
|
||||
if allows != tc.ExpectAllows {
|
||||
t.Errorf("expected %v, got %v", tc.ExpectAllows, allows)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -96,9 +96,11 @@ func BenchmarkAdmit(b *testing.B) {
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
wh.Admit(context.TODO(), attr, objectInterfaces)
|
||||
}
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
wh.Admit(context.TODO(), attr, objectInterfaces)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -85,9 +85,11 @@ func BenchmarkValidate(b *testing.B) {
|
||||
attr := webhooktesting.NewAttribute(ns, nil, tt.IsDryRun)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
wh.Validate(context.TODO(), attr, objectInterfaces)
|
||||
}
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
wh.Validate(context.TODO(), attr, objectInterfaces)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -136,6 +136,11 @@ func (cm *ClientManager) HookClient(cc ClientConfig) (*rest.RESTClient, error) {
|
||||
}
|
||||
cfg.TLSClientConfig.CAData = append(cfg.TLSClientConfig.CAData, cc.CABundle...)
|
||||
|
||||
// Use http/1.1 instead of http/2.
|
||||
// This is a workaround for http/2-enabled clients not load-balancing concurrent requests to multiple backends.
|
||||
// See http://issue.k8s.io/75791 for details.
|
||||
cfg.NextProtos = []string{"http/1.1"}
|
||||
|
||||
cfg.ContentConfig.NegotiatedSerializer = cm.negotiatedSerializer
|
||||
cfg.ContentConfig.ContentType = runtime.ContentTypeJSON
|
||||
client, err := rest.UnversionedRESTClientFor(cfg)
|
||||
|
@ -211,6 +211,12 @@ type TLSClientConfig struct {
|
||||
// CAData holds PEM-encoded bytes (typically read from a root certificates bundle).
|
||||
// CAData takes precedence over CAFile
|
||||
CAData []byte
|
||||
|
||||
// NextProtos is a list of supported application level protocols, in order of preference.
|
||||
// Used to populate tls.Config.NextProtos.
|
||||
// To indicate to the server http/1.1 is preferred over http/2, set to ["http/1.1", "h2"] (though the server is free to ignore that preference).
|
||||
// To use only http/1.1, set to ["http/1.1"].
|
||||
NextProtos []string
|
||||
}
|
||||
|
||||
var _ fmt.Stringer = TLSClientConfig{}
|
||||
@ -236,6 +242,7 @@ func (c TLSClientConfig) String() string {
|
||||
CertData: c.CertData,
|
||||
KeyData: c.KeyData,
|
||||
CAData: c.CAData,
|
||||
NextProtos: c.NextProtos,
|
||||
}
|
||||
// Explicitly mark non-empty credential fields as redacted.
|
||||
if len(cc.CertData) != 0 {
|
||||
@ -503,6 +510,7 @@ func AnonymousClientConfig(config *Config) *Config {
|
||||
ServerName: config.ServerName,
|
||||
CAFile: config.TLSClientConfig.CAFile,
|
||||
CAData: config.TLSClientConfig.CAData,
|
||||
NextProtos: config.TLSClientConfig.NextProtos,
|
||||
},
|
||||
RateLimiter: config.RateLimiter,
|
||||
UserAgent: config.UserAgent,
|
||||
@ -541,6 +549,7 @@ func CopyConfig(config *Config) *Config {
|
||||
CertData: config.TLSClientConfig.CertData,
|
||||
KeyData: config.TLSClientConfig.KeyData,
|
||||
CAData: config.TLSClientConfig.CAData,
|
||||
NextProtos: config.TLSClientConfig.NextProtos,
|
||||
},
|
||||
UserAgent: config.UserAgent,
|
||||
DisableCompression: config.DisableCompression,
|
||||
|
@ -493,10 +493,11 @@ func TestConfigSprint(t *testing.T) {
|
||||
Env: []clientcmdapi.ExecEnvVar{{Name: "secret", Value: "s3cr3t"}},
|
||||
},
|
||||
TLSClientConfig: TLSClientConfig{
|
||||
CertFile: "a.crt",
|
||||
KeyFile: "a.key",
|
||||
CertData: []byte("fake cert"),
|
||||
KeyData: []byte("fake key"),
|
||||
CertFile: "a.crt",
|
||||
KeyFile: "a.key",
|
||||
CertData: []byte("fake cert"),
|
||||
KeyData: []byte("fake key"),
|
||||
NextProtos: []string{"h2", "http/1.1"},
|
||||
},
|
||||
UserAgent: "gobot",
|
||||
Transport: &fakeRoundTripper{},
|
||||
@ -508,7 +509,7 @@ func TestConfigSprint(t *testing.T) {
|
||||
Dial: fakeDialFunc,
|
||||
}
|
||||
want := fmt.Sprintf(
|
||||
`&rest.Config{Host:"localhost:8080", APIPath:"v1", ContentConfig:rest.ContentConfig{AcceptContentTypes:"application/json", ContentType:"application/json", GroupVersion:(*schema.GroupVersion)(nil), NegotiatedSerializer:runtime.NegotiatedSerializer(nil)}, Username:"gopher", Password:"--- REDACTED ---", BearerToken:"--- REDACTED ---", BearerTokenFile:"", Impersonate:rest.ImpersonationConfig{UserName:"gopher2", Groups:[]string(nil), Extra:map[string][]string(nil)}, AuthProvider:api.AuthProviderConfig{Name: "gopher", Config: map[string]string{--- REDACTED ---}}, AuthConfigPersister:rest.AuthProviderConfigPersister(--- REDACTED ---), ExecProvider:api.AuthProviderConfig{Command: "sudo", Args: []string{"--- REDACTED ---"}, Env: []ExecEnvVar{--- REDACTED ---}, APIVersion: ""}, TLSClientConfig:rest.sanitizedTLSClientConfig{Insecure:false, ServerName:"", CertFile:"a.crt", KeyFile:"a.key", CAFile:"", CertData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, KeyData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x52, 0x45, 0x44, 0x41, 0x43, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, CAData:[]uint8(nil)}, UserAgent:"gobot", DisableCompression:false, Transport:(*rest.fakeRoundTripper)(%p), WrapTransport:(transport.WrapperFunc)(%p), QPS:1, Burst:2, RateLimiter:(*rest.fakeLimiter)(%p), Timeout:3000000000, Dial:(func(context.Context, string, string) (net.Conn, error))(%p)}`,
|
||||
`&rest.Config{Host:"localhost:8080", APIPath:"v1", ContentConfig:rest.ContentConfig{AcceptContentTypes:"application/json", ContentType:"application/json", GroupVersion:(*schema.GroupVersion)(nil), NegotiatedSerializer:runtime.NegotiatedSerializer(nil)}, Username:"gopher", Password:"--- REDACTED ---", BearerToken:"--- REDACTED ---", BearerTokenFile:"", Impersonate:rest.ImpersonationConfig{UserName:"gopher2", Groups:[]string(nil), Extra:map[string][]string(nil)}, AuthProvider:api.AuthProviderConfig{Name: "gopher", Config: map[string]string{--- REDACTED ---}}, AuthConfigPersister:rest.AuthProviderConfigPersister(--- REDACTED ---), ExecProvider:api.AuthProviderConfig{Command: "sudo", Args: []string{"--- REDACTED ---"}, Env: []ExecEnvVar{--- REDACTED ---}, APIVersion: ""}, TLSClientConfig:rest.sanitizedTLSClientConfig{Insecure:false, ServerName:"", CertFile:"a.crt", KeyFile:"a.key", CAFile:"", CertData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, KeyData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x52, 0x45, 0x44, 0x41, 0x43, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, CAData:[]uint8(nil), NextProtos:[]string{"h2", "http/1.1"}}, UserAgent:"gobot", DisableCompression:false, Transport:(*rest.fakeRoundTripper)(%p), WrapTransport:(transport.WrapperFunc)(%p), QPS:1, Burst:2, RateLimiter:(*rest.fakeLimiter)(%p), Timeout:3000000000, Dial:(func(context.Context, string, string) (net.Conn, error))(%p)}`,
|
||||
c.Transport, fakeWrapperFunc, c.RateLimiter, fakeDialFunc,
|
||||
)
|
||||
|
||||
|
@ -74,6 +74,7 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
|
||||
CertData: c.CertData,
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
NextProtos: c.NextProtos,
|
||||
},
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
|
@ -38,6 +38,11 @@ func (in *TLSClientConfig) DeepCopyInto(out *TLSClientConfig) {
|
||||
*out = make([]byte, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
if in.NextProtos != nil {
|
||||
in, out := &in.NextProtos, &out.NextProtos
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -45,6 +46,7 @@ type tlsCacheKey struct {
|
||||
keyData string
|
||||
getCert string
|
||||
serverName string
|
||||
nextProtos string
|
||||
dial string
|
||||
disableCompression bool
|
||||
}
|
||||
@ -114,6 +116,7 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) {
|
||||
keyData: string(c.TLS.KeyData),
|
||||
getCert: fmt.Sprintf("%p", c.TLS.GetCert),
|
||||
serverName: c.TLS.ServerName,
|
||||
nextProtos: strings.Join(c.TLS.NextProtos, ","),
|
||||
dial: fmt.Sprintf("%p", c.Dial),
|
||||
disableCompression: c.DisableCompression,
|
||||
}, nil
|
||||
|
@ -126,6 +126,8 @@ func TestTLSConfigKey(t *testing.T) {
|
||||
GetCert: getCert,
|
||||
},
|
||||
},
|
||||
"http2, http1.1": {TLS: TLSConfig{NextProtos: []string{"h2", "http/1.1"}}},
|
||||
"http1.1-only": {TLS: TLSConfig{NextProtos: []string{"http/1.1"}}},
|
||||
}
|
||||
for nameA, valueA := range uniqueConfigurations {
|
||||
for nameB, valueB := range uniqueConfigurations {
|
||||
|
@ -126,5 +126,11 @@ type TLSConfig struct {
|
||||
CertData []byte // Bytes of the PEM-encoded client certificate. Supercedes CertFile.
|
||||
KeyData []byte // Bytes of the PEM-encoded client key. Supercedes KeyFile.
|
||||
|
||||
// NextProtos is a list of supported application level protocols, in order of preference.
|
||||
// Used to populate tls.Config.NextProtos.
|
||||
// To indicate to the server http/1.1 is preferred over http/2, set to ["http/1.1", "h2"] (though the server is free to ignore that preference).
|
||||
// To use only http/1.1, set to ["http/1.1"].
|
||||
NextProtos []string
|
||||
|
||||
GetCert func() (*tls.Certificate, error) // Callback that returns a TLS client certificate. CertData, CertFile, KeyData and KeyFile supercede this field.
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ func New(config *Config) (http.RoundTripper, error) {
|
||||
// TLSConfigFor returns a tls.Config that will provide the transport level security defined
|
||||
// by the provided Config. Will return nil if no transport level security is requested.
|
||||
func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
if !(c.HasCA() || c.HasCertAuth() || c.HasCertCallback() || c.TLS.Insecure || len(c.TLS.ServerName) > 0) {
|
||||
if !(c.HasCA() || c.HasCertAuth() || c.HasCertCallback() || c.TLS.Insecure || len(c.TLS.ServerName) > 0 || len(c.TLS.NextProtos) > 0) {
|
||||
return nil, nil
|
||||
}
|
||||
if c.HasCA() && c.TLS.Insecure {
|
||||
@ -73,6 +73,7 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
MinVersion: tls.VersionTLS12,
|
||||
InsecureSkipVerify: c.TLS.Insecure,
|
||||
ServerName: c.TLS.ServerName,
|
||||
NextProtos: c.TLS.NextProtos,
|
||||
}
|
||||
|
||||
if c.HasCA() {
|
||||
|
@ -5,6 +5,7 @@ go_test(
|
||||
srcs = [
|
||||
"admission_test.go",
|
||||
"broken_webhook_test.go",
|
||||
"load_balance_test.go",
|
||||
"main_test.go",
|
||||
"reinvocation_test.go",
|
||||
"timeout_test.go",
|
||||
|
314
test/integration/apiserver/admissionwebhook/load_balance_test.go
Normal file
314
test/integration/apiserver/admissionwebhook/load_balance_test.go
Normal file
@ -0,0 +1,314 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package admissionwebhook
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/admission/v1beta1"
|
||||
admissionv1beta1 "k8s.io/api/admissionregistration/v1beta1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
testLoadBalanceClientUsername = "webhook-balance-integration-client"
|
||||
)
|
||||
|
||||
// TestWebhookLoadBalance ensures that the admission webhook opens multiple connections to backends to satisfy concurrent requests
|
||||
func TestWebhookLoadBalance(t *testing.T) {
|
||||
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(localhostCert) {
|
||||
t.Fatal("Failed to append Cert from PEM")
|
||||
}
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to build cert with error: %+v", err)
|
||||
}
|
||||
|
||||
localListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
if localListener, err = net.Listen("tcp6", "[::1]:0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
trackingListener := &connectionTrackingListener{delegate: localListener}
|
||||
|
||||
recorder := &connectionRecorder{}
|
||||
handler := newLoadBalanceWebhookHandler(recorder)
|
||||
httpServer := &http.Server{
|
||||
Handler: handler,
|
||||
TLSConfig: &tls.Config{
|
||||
RootCAs: roots,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
httpServer.ServeTLS(trackingListener, "", "")
|
||||
}()
|
||||
defer httpServer.Close()
|
||||
|
||||
webhookURL := "https://" + localListener.Addr().String()
|
||||
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
}, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
||||
// Configure a client with a distinct user name so that it is easy to distinguish requests
|
||||
// made by the client from requests made by controllers. We use this to filter out requests
|
||||
// before recording them to ensure we don't accidentally mistake requests from controllers
|
||||
// as requests made by the client.
|
||||
clientConfig := rest.CopyConfig(s.ClientConfig)
|
||||
clientConfig.QPS = 100
|
||||
clientConfig.Burst = 200
|
||||
clientConfig.Impersonate.UserName = testLoadBalanceClientUsername
|
||||
clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"}
|
||||
client, err := clientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.CoreV1().Pods("default").Create(loadBalanceMarkerFixture)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
upCh := recorder.Reset()
|
||||
ns := "load-balance"
|
||||
_, err = client.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fail := admissionv1beta1.Fail
|
||||
mutatingCfg, err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Create(&admissionv1beta1.MutatingWebhookConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "admission.integration.test"},
|
||||
Webhooks: []admissionv1beta1.MutatingWebhook{{
|
||||
Name: "admission.integration.test",
|
||||
ClientConfig: admissionv1beta1.WebhookClientConfig{
|
||||
URL: &webhookURL,
|
||||
CABundle: localhostCert,
|
||||
},
|
||||
Rules: []admissionv1beta1.RuleWithOperations{{
|
||||
Operations: []admissionv1beta1.OperationType{admissionv1beta1.OperationAll},
|
||||
Rule: admissionv1beta1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"pods"}},
|
||||
}},
|
||||
FailurePolicy: &fail,
|
||||
AdmissionReviewVersions: []string{"v1beta1"},
|
||||
}},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Delete(mutatingCfg.GetName(), &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// wait until new webhook is called the first time
|
||||
if err := wait.PollImmediate(time.Millisecond*5, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, err = client.CoreV1().Pods("default").Patch(loadBalanceMarkerFixture.Name, types.JSONPatchType, []byte("[]"))
|
||||
select {
|
||||
case <-upCh:
|
||||
return true, nil
|
||||
default:
|
||||
t.Logf("Waiting for webhook to become effective, getting marker object: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ns,
|
||||
GenerateName: "loadbalance-",
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []v1.Container{{
|
||||
Name: "fake-name",
|
||||
Image: "fakeimage",
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
// Submit 10 parallel requests
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := client.CoreV1().Pods(ns).Create(pod)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if actual := atomic.LoadInt64(&trackingListener.connections); actual < 10 {
|
||||
t.Errorf("expected at least 10 connections, got %d", actual)
|
||||
}
|
||||
trackingListener.Reset()
|
||||
|
||||
// Submit 10 more parallel requests
|
||||
wg = &sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := client.CoreV1().Pods(ns).Create(pod)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if actual := atomic.LoadInt64(&trackingListener.connections); actual > 0 {
|
||||
t.Errorf("expected no additional connections (reusing kept-alive connections), got %d", actual)
|
||||
}
|
||||
}
|
||||
|
||||
type connectionRecorder struct {
|
||||
mu sync.Mutex
|
||||
upCh chan struct{}
|
||||
upOnce sync.Once
|
||||
}
|
||||
|
||||
// Reset zeros out all counts and returns a channel that is closed when the first admission of the
|
||||
// marker object is received.
|
||||
func (i *connectionRecorder) Reset() chan struct{} {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
i.upCh = make(chan struct{})
|
||||
i.upOnce = sync.Once{}
|
||||
return i.upCh
|
||||
}
|
||||
|
||||
func (i *connectionRecorder) MarkerReceived() {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
i.upOnce.Do(func() {
|
||||
close(i.upCh)
|
||||
})
|
||||
}
|
||||
|
||||
func newLoadBalanceWebhookHandler(recorder *connectionRecorder) http.Handler {
|
||||
allow := func(w http.ResponseWriter) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
|
||||
Response: &v1beta1.AdmissionResponse{
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println(r.Proto)
|
||||
defer r.Body.Close()
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
}
|
||||
review := v1beta1.AdmissionReview{}
|
||||
if err := json.Unmarshal(data, &review); err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
}
|
||||
if review.Request.UserInfo.Username != testLoadBalanceClientUsername {
|
||||
// skip requests not originating from this integration test's client
|
||||
allow(w)
|
||||
return
|
||||
}
|
||||
|
||||
if len(review.Request.Object.Raw) == 0 {
|
||||
http.Error(w, err.Error(), 400)
|
||||
}
|
||||
pod := &corev1.Pod{}
|
||||
if err := json.Unmarshal(review.Request.Object.Raw, pod); err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
}
|
||||
|
||||
// When resetting between tests, a marker object is patched until this webhook
|
||||
// observes it, at which point it is considered ready.
|
||||
if pod.Namespace == loadBalanceMarkerFixture.Namespace && pod.Name == loadBalanceMarkerFixture.Name {
|
||||
recorder.MarkerReceived()
|
||||
allow(w)
|
||||
return
|
||||
}
|
||||
|
||||
// simulate a loaded backend
|
||||
time.Sleep(2 * time.Second)
|
||||
allow(w)
|
||||
})
|
||||
}
|
||||
|
||||
var loadBalanceMarkerFixture = &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "marker",
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []v1.Container{{
|
||||
Name: "fake-name",
|
||||
Image: "fakeimage",
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
type connectionTrackingListener struct {
|
||||
connections int64
|
||||
delegate net.Listener
|
||||
}
|
||||
|
||||
func (c *connectionTrackingListener) Reset() {
|
||||
atomic.StoreInt64(&c.connections, 0)
|
||||
}
|
||||
|
||||
func (c *connectionTrackingListener) Accept() (net.Conn, error) {
|
||||
conn, err := c.delegate.Accept()
|
||||
if err == nil {
|
||||
atomic.AddInt64(&c.connections, 1)
|
||||
}
|
||||
return conn, err
|
||||
}
|
||||
func (c *connectionTrackingListener) Close() error {
|
||||
return c.delegate.Close()
|
||||
}
|
||||
func (c *connectionTrackingListener) Addr() net.Addr {
|
||||
return c.delegate.Addr()
|
||||
}
|
Loading…
Reference in New Issue
Block a user