Merge pull request #62649 from liggitt/loopback-routing

Automatic merge from submit-queue (batch tested with PRs 50899, 62649). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Ensure webhook service routing resolves kubernetes.default.svc correctly

Going through the normal endpoint resolve path isn't correct in multi-master scenarios

The auth wrapper is pulling from LoopbackClientConfig, the service resolver should do the same

```release-note
Fixes the kubernetes.default.svc loopback service resolution to use a loopback configuration.
```
This commit is contained in:
Kubernetes Submit Queue 2018-04-20 15:34:12 -07:00 committed by GitHub
commit 9c25da64f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 352 additions and 10 deletions

View File

@ -507,6 +507,13 @@ func BuildGenericConfig(
versionedInformers.Core().V1().Services().Lister(),
)
}
// resolve kubernetes.default.svc locally
localHost, err := url.Parse(genericConfig.LoopbackClientConfig.Host)
if err != nil {
lastErr = err
return
}
serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, sharedInformers)
if err != nil {

View File

@ -113,7 +113,12 @@ func (cm *ClientManager) HookClient(h *v1beta1.Webhook) (*rest.RESTClient, error
}
complete := func(cfg *rest.Config) (*rest.RESTClient, error) {
cfg.TLSClientConfig.CAData = h.ClientConfig.CABundle
// Combine CAData from the config with any existing CA bundle provided
if len(cfg.TLSClientConfig.CAData) > 0 {
cfg.TLSClientConfig.CAData = append(cfg.TLSClientConfig.CAData, '\n')
}
cfg.TLSClientConfig.CAData = append(cfg.TLSClientConfig.CAData, h.ClientConfig.CABundle...)
cfg.ContentConfig.NegotiatedSerializer = cm.negotiatedSerializer
cfg.ContentConfig.ContentType = runtime.ContentTypeJSON
client, err := rest.UnversionedRESTClientFor(cfg)
@ -135,7 +140,10 @@ func (cm *ClientManager) HookClient(h *v1beta1.Webhook) (*rest.RESTClient, error
if svc.Path != nil {
cfg.APIPath = *svc.Path
}
cfg.TLSClientConfig.ServerName = serverName
// Set the server name if not already set
if len(cfg.TLSClientConfig.ServerName) == 0 {
cfg.TLSClientConfig.ServerName = serverName
}
delegateDialer := cfg.Dial
if delegateDialer == nil {

View File

@ -44,6 +44,7 @@ type tlsCacheKey struct {
certData string
keyData string
serverName string
dial string
}
func (t tlsCacheKey) String() string {
@ -51,7 +52,7 @@ func (t tlsCacheKey) String() string {
if len(t.keyData) > 0 {
keyText = "<redacted>"
}
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s", t.insecure, t.caData, t.certData, keyText, t.serverName)
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s, dial:%s", t.insecure, t.caData, t.certData, keyText, t.serverName, t.dial)
}
func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
@ -75,7 +76,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
return nil, err
}
// The options didn't require a custom TLS config
if tlsConfig == nil {
if tlsConfig == nil && config.Dial == nil {
return http.DefaultTransport, nil
}
@ -109,5 +110,6 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) {
certData: string(c.TLS.CertData),
keyData: string(c.TLS.KeyData),
serverName: c.TLS.ServerName,
dial: fmt.Sprintf("%p", c.Dial),
}, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package transport
import (
"net"
"net/http"
"testing"
)
@ -53,6 +54,8 @@ func TestTLSConfigKey(t *testing.T) {
// Make sure config fields that affect the tls config affect the cache key
uniqueConfigurations := map[string]*Config{
"no tls": {},
"dialer": {Dial: net.Dial},
"dialer2": {Dial: func(network, address string) (net.Conn, error) { return nil, nil }},
"insecure": {TLS: TLSConfig{Insecure: true}},
"cadata 1": {TLS: TLSConfig{CAData: []byte{1}}},
"cadata 2": {TLS: TLSConfig{CAData: []byte{2}}},
@ -104,11 +107,6 @@ func TestTLSConfigKey(t *testing.T) {
}
for nameA, valueA := range uniqueConfigurations {
for nameB, valueB := range uniqueConfigurations {
// Don't compare to ourselves
if nameA == nameB {
continue
}
keyA, err := tlsConfigKey(valueA)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameA, err)
@ -119,6 +117,15 @@ func TestTLSConfigKey(t *testing.T) {
t.Errorf("Unexpected error for %q: %v", nameB, err)
continue
}
// Make sure we get the same key on the same config
if nameA == nameB {
if keyA != keyB {
t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
}
continue
}
if keyA == keyB {
t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue

View File

@ -52,7 +52,7 @@ func New(config *Config) (http.RoundTripper, error) {
// TLSConfigFor returns a tls.Config that will provide the transport level security defined
// by the provided Config. Will return nil if no transport level security is requested.
func TLSConfigFor(c *Config) (*tls.Config, error) {
if !(c.HasCA() || c.HasCertAuth() || c.TLS.Insecure) {
if !(c.HasCA() || c.HasCertAuth() || c.TLS.Insecure || len(c.TLS.ServerName) > 0) {
return nil, nil
}
if c.HasCA() && c.TLS.Insecure {

View File

@ -101,6 +101,13 @@ func TestNew(t *testing.T) {
Config: &Config{},
},
"server name": {
TLS: true,
Config: &Config{TLS: TLSConfig{
ServerName: "foo",
}},
},
"ca transport": {
TLS: true,
Config: &Config{

View File

@ -61,3 +61,23 @@ type aggregatorClusterRouting struct {
func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveCluster(r.services, namespace, name)
}
// NewLoopbackServiceResolver returns a ServiceResolver that routes the kubernetes/default service to loopback.
func NewLoopbackServiceResolver(delegate ServiceResolver, host *url.URL) ServiceResolver {
return &loopbackResolver{
delegate: delegate,
host: host,
}
}
type loopbackResolver struct {
delegate ServiceResolver
host *url.URL
}
func (r *loopbackResolver) ResolveEndpoint(namespace, name string) (*url.URL, error) {
if namespace == "default" && name == "kubernetes" {
return r.host, nil
}
return r.delegate.ResolveEndpoint(namespace, name)
}

View File

@ -11,15 +11,23 @@ go_test(
srcs = [
"apiserver_test.go",
"main_test.go",
"setup_test.go",
"webhook_test.go",
],
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//pkg/master:go_default_library",
"//pkg/master/reconcilers:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/admissionregistration/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -0,0 +1,165 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"io/ioutil"
"net"
"net/http"
"os"
"path"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/cert"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/test/integration/framework"
)
type TestServerSetup struct {
ModifyServerRunOptions func(*options.ServerRunOptions)
ModifyServerConfig func(*master.Config)
}
// startTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) {
certDir, _ := ioutil.TempDir("", "test-integration-"+t.Name())
go func() {
<-stopCh
os.RemoveAll(certDir)
}()
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
proxySigningKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
t.Fatal(err)
}
proxyCACertFile, _ := ioutil.TempFile(certDir, "proxy-ca.crt")
if err := ioutil.WriteFile(proxyCACertFile.Name(), cert.EncodeCertPEM(proxySigningCert), 0644); err != nil {
t.Fatal(err)
}
clientSigningKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
clientSigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "client-ca"}, clientSigningKey)
if err != nil {
t.Fatal(err)
}
clientCACertFile, _ := ioutil.TempFile(certDir, "client-ca.crt")
if err := ioutil.WriteFile(clientCACertFile.Name(), cert.EncodeCertPEM(clientSigningCert), 0644); err != nil {
t.Fatal(err)
}
listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.Listener = listener
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}
kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"}
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Modes = []string{"Node", "RBAC"}
if setup.ModifyServerRunOptions != nil {
setup.ModifyServerRunOptions(kubeAPIServerOptions)
}
completedOptions, err := app.Complete(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}
if setup.ModifyServerConfig != nil {
setup.ModifyServerConfig(kubeAPIServerConfig)
}
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook)
if err != nil {
t.Fatal(err)
}
go func() {
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
t.Fatal(err)
}
}()
// Adjust the loopback config for external use (external server name and CA)
kubeAPIServerClientConfig := rest.CopyConfig(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
kubeAPIServerClientConfig.CAFile = path.Join(certDir, "apiserver.crt")
kubeAPIServerClientConfig.CAData = nil
kubeAPIServerClientConfig.ServerName = ""
// wait for health
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
healthzConfig := rest.CopyConfig(kubeAPIServerClientConfig)
healthzConfig.ContentType = ""
healthzConfig.AcceptContentTypes = ""
kubeClient, err := client.NewForConfig(healthzConfig)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
healthStatus := 0
kubeClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
kubeAPIServerClient, err := client.NewForConfig(kubeAPIServerClientConfig)
if err != nil {
t.Fatal(err)
}
return kubeAPIServerClient, kubeAPIServerClientConfig
}

View File

@ -0,0 +1,118 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"sync/atomic"
"testing"
"time"
admissionv1beta1 "k8s.io/api/admissionregistration/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/master/reconcilers"
)
func TestWebhookLoopback(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
webhookPath := "/webhook-test"
called := int32(0)
client, _ := startTestServer(t, stopCh, TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
},
ModifyServerConfig: func(config *master.Config) {
// Avoid resolveable kubernetes service
config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
// Hook into audit to watch requests
config.GenericConfig.AuditBackend = auditSinkFunc(func(events ...*auditinternal.Event) {})
config.GenericConfig.AuditPolicyChecker = auditChecker(func(attrs authorizer.Attributes) (auditinternal.Level, []auditinternal.Stage) {
if attrs.GetPath() == webhookPath {
if attrs.GetUser().GetName() != "system:apiserver" {
t.Errorf("expected user %q, got %q", "system:apiserver", attrs.GetUser().GetName())
}
atomic.AddInt32(&called, 1)
}
return auditinternal.LevelNone, nil
})
},
})
fail := admissionv1beta1.Fail
_, err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Create(&admissionv1beta1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: "webhooktest.example.com"},
Webhooks: []admissionv1beta1.Webhook{{
Name: "webhooktest.example.com",
ClientConfig: admissionv1beta1.WebhookClientConfig{
Service: &admissionv1beta1.ServiceReference{Namespace: "default", Name: "kubernetes", Path: &webhookPath},
},
Rules: []admissionv1beta1.RuleWithOperations{{
Operations: []admissionv1beta1.OperationType{admissionv1beta1.OperationAll},
Rule: admissionv1beta1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"configmaps"}},
}},
FailurePolicy: &fail,
}},
})
if err != nil {
t.Fatal(err)
}
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (done bool, err error) {
_, err = client.CoreV1().ConfigMaps("default").Create(&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "webhook-test"},
Data: map[string]string{"invalid key": "value"},
})
if err == nil {
t.Fatal("Unexpected success")
}
if called > 0 {
return true, nil
}
t.Logf("%v", err)
t.Logf("webhook not called yet, continuing...")
return false, nil
})
if err != nil {
t.Fatal(err)
}
}
type auditChecker func(authorizer.Attributes) (auditinternal.Level, []auditinternal.Stage)
func (f auditChecker) LevelAndStages(attrs authorizer.Attributes) (auditinternal.Level, []auditinternal.Stage) {
return f(attrs)
}
type auditSinkFunc func(events ...*auditinternal.Event)
func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) {
f(events...)
}
func (auditSinkFunc) Run(stopCh <-chan struct{}) error {
return nil
}
func (auditSinkFunc) Shutdown() {
}