dynamic reload cluster authentication info for aggregated API servers

This commit is contained in:
David Eads 2019-11-04 13:46:28 -05:00
parent 758f2ce44f
commit 3aede35b3b
10 changed files with 606 additions and 396 deletions

View File

@ -20,6 +20,8 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//test/utils:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)

View File

@ -34,8 +34,10 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/cert"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"
)
// TearDownFunc is to be called to tear down a test server.
@ -114,7 +116,6 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
for _, f := range s.Flags().FlagSets {
fs.AddFlagSet(f)
}
s.InsecureServing.BindPort = 0
s.SecureServing.Listener, s.SecureServing.BindPort, err = createLocalhostListenerOnFreePort()
@ -122,6 +123,34 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, fmt.Errorf("failed to create listener: %v", err)
}
s.SecureServing.ServerCert.CertDirectory = result.TmpDir
// create optional certificates for aggregation and client-cert auth
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
}
proxyCACertFile := path.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt")
if err := ioutil.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil {
return result, err
}
s.Authentication.RequestHeader.ClientCAFile = proxyCACertFile
clientSigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
clientSigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "client-ca"}, clientSigningKey)
if err != nil {
return result, err
}
clientCACertFile := path.Join(s.SecureServing.ServerCert.CertDirectory, "client-ca.crt")
if err := ioutil.WriteFile(clientCACertFile, testutil.EncodeCertPEM(clientSigningCert), 0644); err != nil {
return result, err
}
s.Authentication.ClientCert.ClientCA = clientCACertFile
s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device
_, thisFile, _, ok := runtime.Caller(0)
@ -134,7 +163,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
s.Etcd.StorageConfig = *storageConfig
s.APIEnablement.RuntimeConfig.Set("api/all=true")
fs.Parse(customFlags)
if err := fs.Parse(customFlags); err != nil {
return result, err
}
completedOptions, err := app.Complete(s)
if err != nil {
return result, fmt.Errorf("failed to set default ServerRunOptions: %v", err)
@ -205,7 +236,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
// from here the caller must call tearDown
result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig
result.ClientConfig = restclient.CopyConfig(server.GenericAPIServer.LoopbackClientConfig)
result.ClientConfig.QPS = 1000
result.ClientConfig.Burst = 10000
result.ServerOpts = s

View File

@ -5,6 +5,7 @@ go_library(
srcs = [
"cert_key.go",
"client_ca.go",
"configmap_cafile_content.go",
"dynamic_cafile_content.go",
"dynamic_serving_content.go",
"dynamic_sni_content.go",
@ -19,10 +20,16 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",

View File

@ -0,0 +1,277 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamiccertificates
import (
"bytes"
"crypto/x509"
"fmt"
"sync/atomic"
"time"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
// ConfigMapCAController provies a CAContentProvider that can dynamically react to configmap changes
// It also fulfills the authenticator interface to provide verifyoptions
type ConfigMapCAController struct {
name string
configmapLister corev1listers.ConfigMapLister
configmapNamespace string
configmapName string
configmapKey string
// configMapInformer is tracked so that we can start these on Run
configMapInformer cache.SharedIndexInformer
// caBundle is a caBundleAndVerifier that contains the last read, non-zero length content of the file
caBundle atomic.Value
listeners []Listener
queue workqueue.RateLimitingInterface
// preRunCaches are the caches to sync before starting the work of this control loop
preRunCaches []cache.InformerSynced
}
var _ Notifier = &ConfigMapCAController{}
var _ CAContentProvider = &ConfigMapCAController{}
var _ ControllerRunner = &ConfigMapCAController{}
// NewDynamicCAFromConfigMapController returns a CAContentProvider based on a configmap that automatically reloads content.
// It is near-realtime via an informer.
func NewDynamicCAFromConfigMapController(purpose, namespace, name, key string, kubeClient kubernetes.Interface) (*ConfigMapCAController, error) {
if len(purpose) == 0 {
return nil, fmt.Errorf("missing purpose for ca bundle")
}
if len(namespace) == 0 {
return nil, fmt.Errorf("missing namespace for ca bundle")
}
if len(name) == 0 {
return nil, fmt.Errorf("missing name for ca bundle")
}
if len(key) == 0 {
return nil, fmt.Errorf("missing key for ca bundle")
}
caContentName := fmt.Sprintf("%s::%s::%s::%s", purpose, namespace, name, key)
// we construct our own informer because we need such a small subset of the information available. Just one namespace.
uncastConfigmapInformer := corev1informers.NewFilteredConfigMapInformer(kubeClient, namespace, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(listOptions *v1.ListOptions) {
listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
})
configmapLister := corev1listers.NewConfigMapLister(uncastConfigmapInformer.GetIndexer())
c := &ConfigMapCAController{
name: caContentName,
configmapNamespace: namespace,
configmapName: name,
configmapKey: key,
configmapLister: configmapLister,
configMapInformer: uncastConfigmapInformer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicConfigMapCABundle-%s", purpose)),
preRunCaches: []cache.InformerSynced{uncastConfigmapInformer.HasSynced},
}
if err := c.loadCABundle(); err != nil {
// don't fail, but do print out a message
klog.Warningf("unable to load initial CA bundle for: %q due to: %s", c.name, err)
}
uncastConfigmapInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
if cast, ok := obj.(*corev1.ConfigMap); ok {
return cast.Name == c.configmapName && cast.Namespace == c.configmapNamespace
}
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if cast, ok := tombstone.Obj.(*corev1.ConfigMap); ok {
return cast.Name == c.configmapName && cast.Namespace == c.configmapNamespace
}
}
return true // always return true just in case. The checks are fairly cheap
},
Handler: cache.ResourceEventHandlerFuncs{
// we have a filter, so any time we're called, we may as well queue. We only ever check one configmap
// so we don't have to be choosy about our key.
AddFunc: func(obj interface{}) {
c.queue.Add(c.keyFn())
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.queue.Add(c.keyFn())
},
DeleteFunc: func(obj interface{}) {
c.queue.Add(c.keyFn())
},
},
})
return c, nil
}
func (c *ConfigMapCAController) keyFn() string {
// this format matches DeletionHandlingMetaNamespaceKeyFunc for our single key
return c.configmapNamespace + "/" + c.configmapName
}
// AddListener adds a listener to be notified when the CA content changes.
func (c *ConfigMapCAController) AddListener(listener Listener) {
c.listeners = append(c.listeners, listener)
}
// loadCABundle determines the next set of content for the file.
func (c *ConfigMapCAController) loadCABundle() error {
configMap, err := c.configmapLister.ConfigMaps(c.configmapNamespace).Get(c.configmapName)
if err != nil {
return err
}
caBundle := configMap.Data[c.configmapKey]
if len(caBundle) == 0 {
return fmt.Errorf("missing content for CA bundle %q", c.Name())
}
// check to see if we have a change. If the values are the same, do nothing.
if !c.hasCAChanged([]byte(caBundle)) {
return nil
}
caBundleAndVerifier, err := newCABundleAndVerifier(c.Name(), []byte(caBundle))
if err != nil {
return err
}
c.caBundle.Store(caBundleAndVerifier)
for _, listener := range c.listeners {
listener.Enqueue()
}
return nil
}
// hasCAChanged returns true if the caBundle is different than the current.
func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool {
uncastExisting := c.caBundle.Load()
if uncastExisting == nil {
return true
}
// check to see if we have a change. If the values are the same, do nothing.
existing, ok := uncastExisting.(*caBundleAndVerifier)
if !ok {
return true
}
if !bytes.Equal(existing.caBundle, caBundle) {
return true
}
return false
}
// RunOnce runs a single sync loop
func (c *ConfigMapCAController) RunOnce() error {
// Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for
// a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures.
_ = c.loadCABundle()
return nil
}
// Run starts the kube-apiserver and blocks until stopCh is closed.
func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting %s", c.name)
defer klog.Infof("Shutting down %s", c.name)
// we have a personal informer that is narrowly scoped, start it.
go c.configMapInformer.Run(stopCh)
// wait for your secondary caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) {
return
}
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
_ = wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey)
return false, nil
}, stopCh)
<-stopCh
}
func (c *ConfigMapCAController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *ConfigMapCAController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
err := c.loadCABundle()
if err == nil {
c.queue.Forget(dsKey)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)
return true
}
// Name is just an identifier
func (c *ConfigMapCAController) Name() string {
return c.name
}
// CurrentCABundleContent provides ca bundle byte content
func (c *ConfigMapCAController) CurrentCABundleContent() []byte {
uncastObj := c.caBundle.Load()
if uncastObj == nil {
return nil // this can happen if we've been unable load data from the apiserver for some reason
}
return c.caBundle.Load().(*caBundleAndVerifier).caBundle
}
// VerifyOptions provides verifyoptions compatible with authenticators
func (c *ConfigMapCAController) VerifyOptions() (x509.VerifyOptions, bool) {
uncastObj := c.caBundle.Load()
if uncastObj == nil {
// This can happen if we've been unable load data from the apiserver for some reason.
// In this case, we should not accept any connections on the basis of this ca bundle.
return x509.VerifyOptions{}, false
}
return uncastObj.(*caBundleAndVerifier).verifyOptions, true
}

View File

@ -19,7 +19,6 @@ package options
import (
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"time"
@ -27,7 +26,6 @@ import (
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
@ -251,38 +249,63 @@ func (s *DelegatingAuthenticationOptions) ApplyTo(authenticationInfo *server.Aut
cfg.TokenAccessReviewClient = client.AuthenticationV1().TokenReviews()
}
// look into configmaps/external-apiserver-authentication for missing authn info
if !s.SkipInClusterLookup {
err := s.lookupMissingConfigInCluster(client)
// get the clientCA information
clientCAFileSpecified := len(s.ClientCert.ClientCA) > 0
var clientCAProvider dynamiccertificates.CAContentProvider
if clientCAFileSpecified {
clientCAProvider, err = s.ClientCert.GetClientCAContentProvider()
if err != nil {
if s.TolerateInClusterLookupFailure {
klog.Warningf("Error looking up in-cluster authentication configuration: %v", err)
klog.Warningf("Continuing without authentication configuration. This may treat all requests as anonymous.")
klog.Warningf("To require authentication configuration lookup to succeed, set --authentication-tolerate-lookup-failure=false")
} else {
return err
return fmt.Errorf("unable to load client CA file %q: %v", s.ClientCert.ClientCA, err)
}
cfg.ClientCertificateCAContentProvider = clientCAProvider
if err = authenticationInfo.ApplyClientCert(cfg.ClientCertificateCAContentProvider, servingInfo); err != nil {
return fmt.Errorf("unable to assign client CA file: %v", err)
}
} else if !s.SkipInClusterLookup {
if client == nil {
klog.Warningf("No authentication-kubeconfig provided in order to lookup client-ca-file in configmap/%s in %s, so client certificate authentication won't work.", authenticationConfigMapName, authenticationConfigMapNamespace)
} else {
clientCAProvider, err = dynamiccertificates.NewDynamicCAFromConfigMapController("client-ca", authenticationConfigMapNamespace, authenticationConfigMapName, "client-ca-file", client)
if err != nil {
return fmt.Errorf("unable to load configmap based client CA file: %v", err)
}
cfg.ClientCertificateCAContentProvider = clientCAProvider
if err = authenticationInfo.ApplyClientCert(cfg.ClientCertificateCAContentProvider, servingInfo); err != nil {
return fmt.Errorf("unable to assign configmap based client CA file: %v", err)
}
}
}
requestHeaderCAFileSpecified := len(s.RequestHeader.ClientCAFile) > 0
var requestHeaderConfig *authenticatorfactory.RequestHeaderConfig
if requestHeaderCAFileSpecified {
requestHeaderConfig, err = s.RequestHeader.ToAuthenticationRequestHeaderConfig()
if err != nil {
return fmt.Errorf("unable to create request header authentication config: %v", err)
}
} else if !s.SkipInClusterLookup {
if client == nil {
klog.Warningf("No authentication-kubeconfig provided in order to lookup requestheader-client-ca-file in configmap/%s in %s, so request-header client certificate authentication won't work.", authenticationConfigMapName, authenticationConfigMapNamespace)
} else {
requestHeaderConfig, err = s.createRequestHeaderConfig(client)
if err != nil {
if s.TolerateInClusterLookupFailure {
klog.Warningf("Error looking up in-cluster authentication configuration: %v", err)
klog.Warningf("Continuing without authentication configuration. This may treat all requests as anonymous.")
klog.Warningf("To require authentication configuration lookup to succeed, set --authentication-tolerate-lookup-failure=false")
} else {
return fmt.Errorf("unable to load configmap based request-header-client-ca-file: %v", err)
}
}
}
}
// configure AuthenticationInfo config
cfg.ClientCertificateCAContentProvider, err = s.ClientCert.GetClientCAContentProvider()
if err != nil {
return fmt.Errorf("unable to load client CA file: %v", err)
}
if cfg.ClientCertificateCAContentProvider != nil {
if err = authenticationInfo.ApplyClientCert(cfg.ClientCertificateCAContentProvider, servingInfo); err != nil {
return fmt.Errorf("unable to load client CA file: %v", err)
}
}
cfg.RequestHeaderConfig, err = s.RequestHeader.ToAuthenticationRequestHeaderConfig()
if err != nil {
return fmt.Errorf("unable to create request header authentication config: %v", err)
}
if cfg.RequestHeaderConfig != nil {
if requestHeaderConfig != nil {
cfg.RequestHeaderConfig = requestHeaderConfig
if err = authenticationInfo.ApplyClientCert(cfg.RequestHeaderConfig.CAContentProvider, servingInfo); err != nil {
return fmt.Errorf("unable to load client CA file: %v", err)
return fmt.Errorf("unable to load request-header-client-ca-file: %v", err)
}
}
@ -310,97 +333,26 @@ const (
authenticationRoleName = "extension-apiserver-authentication-reader"
)
func (s *DelegatingAuthenticationOptions) lookupMissingConfigInCluster(client kubernetes.Interface) error {
if len(s.ClientCert.ClientCA) > 0 && len(s.RequestHeader.ClientCAFile) > 0 {
return nil
}
if client == nil {
if len(s.ClientCert.ClientCA) == 0 {
klog.Warningf("No authentication-kubeconfig provided in order to lookup client-ca-file in configmap/%s in %s, so client certificate authentication won't work.", authenticationConfigMapName, authenticationConfigMapNamespace)
}
if len(s.RequestHeader.ClientCAFile) == 0 {
klog.Warningf("No authentication-kubeconfig provided in order to lookup requestheader-client-ca-file in configmap/%s in %s, so request-header client certificate authentication won't work.", authenticationConfigMapName, authenticationConfigMapNamespace)
}
return nil
func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kubernetes.Interface) (*authenticatorfactory.RequestHeaderConfig, error) {
requestHeaderCAProvider, err := dynamiccertificates.NewDynamicCAFromConfigMapController("client-ca", authenticationConfigMapNamespace, authenticationConfigMapName, "requestheader-client-ca-file", client)
if err != nil {
return nil, fmt.Errorf("unable to create request header authentication config: %v", err)
}
authConfigMap, err := client.CoreV1().ConfigMaps(authenticationConfigMapNamespace).Get(authenticationConfigMapName, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
// ignore, authConfigMap is nil now
return nil, nil
case errors.IsForbidden(err):
klog.Warningf("Unable to get configmap/%s in %s. Usually fixed by "+
"'kubectl create rolebinding -n %s ROLEBINDING_NAME --role=%s --serviceaccount=YOUR_NS:YOUR_SA'",
authenticationConfigMapName, authenticationConfigMapNamespace, authenticationConfigMapNamespace, authenticationRoleName)
return err
return nil, err
case err != nil:
return err
}
if len(s.ClientCert.ClientCA) == 0 {
if authConfigMap != nil {
opt, err := inClusterClientCA(authConfigMap)
if err != nil {
return err
}
if opt != nil {
s.ClientCert = *opt
}
}
if len(s.ClientCert.ClientCA) == 0 {
klog.Warningf("Cluster doesn't provide client-ca-file in configmap/%s in %s, so client certificate authentication won't work.", authenticationConfigMapName, authenticationConfigMapNamespace)
}
}
if len(s.RequestHeader.ClientCAFile) == 0 {
if authConfigMap != nil {
opt, err := inClusterRequestHeader(authConfigMap)
if err != nil {
return err
}
if opt != nil {
s.RequestHeader = *opt
}
}
if len(s.RequestHeader.ClientCAFile) == 0 {
klog.Warningf("Cluster doesn't provide requestheader-client-ca-file in configmap/%s in %s, so request-header client certificate authentication won't work.", authenticationConfigMapName, authenticationConfigMapNamespace)
}
}
return nil
}
func inClusterClientCA(authConfigMap *v1.ConfigMap) (*ClientCertAuthenticationOptions, error) {
clientCA, ok := authConfigMap.Data["client-ca-file"]
if !ok {
// not having a client-ca is fine, return nil
return nil, nil
}
clientCAProvider, err := dynamiccertificates.NewStaticCAContent("client-ca-file", []byte(clientCA))
if err != nil {
return nil, err
}
return &ClientCertAuthenticationOptions{
ClientCA: "",
CAContentProvider: clientCAProvider,
}, nil
}
func inClusterRequestHeader(authConfigMap *v1.ConfigMap) (*RequestHeaderAuthenticationOptions, error) {
requestHeaderCA, ok := authConfigMap.Data["requestheader-client-ca-file"]
if !ok {
// not having a requestheader-client-ca is fine, return nil
return nil, nil
}
f, err := ioutil.TempFile("", "requestheader-client-ca-file")
if err != nil {
return nil, err
}
if err := ioutil.WriteFile(f.Name(), []byte(requestHeaderCA), 0600); err != nil {
return nil, err
}
usernameHeaders, err := deserializeStrings(authConfigMap.Data["requestheader-username-headers"])
if err != nil {
return nil, err
@ -418,12 +370,12 @@ func inClusterRequestHeader(authConfigMap *v1.ConfigMap) (*RequestHeaderAuthenti
return nil, err
}
return &RequestHeaderAuthenticationOptions{
UsernameHeaders: usernameHeaders,
GroupHeaders: groupHeaders,
ExtraHeaderPrefixes: extraHeaderPrefixes,
ClientCAFile: f.Name(),
AllowedNames: allowedNames,
return &authenticatorfactory.RequestHeaderConfig{
CAContentProvider: requestHeaderCAProvider,
UsernameHeaders: headerrequest.StaticStringSlice(usernameHeaders),
GroupHeaders: headerrequest.StaticStringSlice(groupHeaders),
ExtraHeaderPrefixes: headerrequest.StaticStringSlice(extraHeaderPrefixes),
AllowedClientNames: headerrequest.StaticStringSlice(allowedNames),
}, nil
}

View File

@ -81,17 +81,19 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro
}
// start controllers if possible
if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
return nil, err
klog.Warningf("Initial population of client CA failed: %v", err)
}
go controller.Run(1, stopCh)
}
if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
return nil, err
klog.Warningf("Initial population of default serving certificate failed: %v", err)
}
go controller.Run(1, stopCh)
@ -102,18 +104,20 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro
}
if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
return nil, err
klog.Warningf("Initial population of SNI serving certificate failed: %v", err)
}
go controller.Run(1, stopCh)
}
}
// runonce to be sure that we have a value.
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := dynamicCertificateController.RunOnce(); err != nil {
return nil, err
klog.Warningf("Initial population of dynamic certificates failed: %v", err)
}
go dynamicCertificateController.Run(1, stopCh)

View File

@ -57,8 +57,8 @@ func GetClientCANamesForURL(kubeConfigURL string) ([]string, error) {
return GetClientCANames(apiserverURL.Host)
}
// GetServingCertificates returns the x509 certs used by a server. The serverName is optional for specifying a different
// name to get SNI certificates. apiHost is "host:port"
// GetServingCertificates returns the x509 certs used by a server as certificates and pem encoded bytes.
// The serverName is optional for specifying a different name to get SNI certificates. apiHost is "host:port"
func GetServingCertificates(apiHost, serverName string) ([]*x509.Certificate, [][]byte, error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: true, // this is insecure so that we always get connected

View File

@ -15,8 +15,8 @@ go_test(
],
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//cmd/kube-apiserver/app/testing:go_default_library",
"//pkg/master:go_default_library",
"//pkg/master/reconcilers:go_default_library",
"//staging/src/k8s.io/api/admissionregistration/v1beta1:go_default_library",
@ -27,7 +27,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
@ -35,15 +35,12 @@ go_test(
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/cmd/server:go_default_library",
"//staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library",
"//staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1:go_default_library",
"//staging/src/k8s.io/sample-apiserver/pkg/cmd/server:go_default_library",
"//test/integration/framework:go_default_library",
"//test/utils:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

View File

@ -17,7 +17,6 @@ limitations under the License.
package apiserver
import (
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
@ -25,7 +24,7 @@ import (
"net/http"
"os"
"path"
"sync/atomic"
"reflect"
"testing"
"time"
@ -35,293 +34,82 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
discovery "k8s.io/client-go/discovery"
"k8s.io/client-go/discovery"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
apiregistrationv1beta1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
kubeaggregatorserver "k8s.io/kube-aggregator/pkg/cmd/server"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
sampleserver "k8s.io/sample-apiserver/pkg/cmd/server"
)
func TestAggregatedAPIServer(t *testing.T) {
// makes the kube-apiserver very responsive. it's normally a minute
dynamiccertificates.FileRefreshDuration = 1 * time.Second
stopCh := make(chan struct{})
defer close(stopCh)
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
defer os.RemoveAll(certDir)
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
t.Fatal(err)
}
proxyCACertFile, _ := ioutil.TempFile(certDir, "proxy-ca.crt")
if err := ioutil.WriteFile(proxyCACertFile.Name(), testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil {
t.Fatal(err)
}
clientSigningKey, err := testutil.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
clientSigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "client-ca"}, clientSigningKey)
if err != nil {
t.Fatal(err)
}
clientCACertFile, _ := ioutil.TempFile(certDir, "client-ca.crt")
if err := ioutil.WriteFile(clientCACertFile.Name(), testutil.EncodeCertPEM(clientSigningCert), 0644); err != nil {
t.Fatal(err)
}
kubeClientConfigValue := atomic.Value{}
go func() {
listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.Listener = listener
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String()
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}
kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"}
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
completedOptions, err := app.Complete(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}
// Adjust the loopback config for external use (external server name and CA)
kubeAPIServerClientConfig := rest.CopyConfig(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
kubeAPIServerClientConfig.CAFile = path.Join(certDir, "apiserver.crt")
kubeAPIServerClientConfig.CAData = nil
kubeAPIServerClientConfig.ServerName = ""
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
t.Fatal(err)
}
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
t.Fatal(err)
}
}()
// just use json because everyone speaks it
err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
obj := kubeClientConfigValue.Load()
if obj == nil {
return false, nil
}
kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config)
kubeClientConfig.ContentType = ""
kubeClientConfig.AcceptContentTypes = ""
kubeClient, err := client.NewForConfig(kubeClientConfig)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
healthStatus := 0
kubeClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// after this point we won't be mutating, so the race detector will be fine
kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config)
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
adminKubeConfig := createKubeConfig(kubeClientConfig)
kubeconfigFile, _ := ioutil.TempFile("", "")
defer os.Remove(kubeconfigFile.Name())
clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name())
wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
defer os.RemoveAll(wardleCertDir)
wardlePort := new(int32)
testServer := kastesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer testServer.TearDownFn()
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
// force json because everything speaks it
kubeClientConfig.ContentType = ""
kubeClientConfig.AcceptContentTypes = ""
kubeClient := client.NewForConfigOrDie(kubeClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
// start the wardle server to prove we can aggregate it
wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeClientConfig))
defer os.Remove(wardleToKASKubeConfigFile)
wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
defer os.RemoveAll(wardleCertDir)
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
go func() {
listener, port, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
atomic.StoreInt32(wardlePort, int32(port))
o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr)
o.RecommendedOptions.SecureServing.Listener = listener
o.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh)
wardleCmd.SetArgs([]string{
"--requestheader-username-headers=X-Remote-User",
"--requestheader-group-headers=X-Remote-Group",
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
"--requestheader-client-ca-file=" + proxyCACertFile.Name(),
"--requestheader-allowed-names=kube-aggregator",
"--authentication-kubeconfig", kubeconfigFile.Name(),
"--authorization-kubeconfig", kubeconfigFile.Name(),
"--authentication-kubeconfig", wardleToKASKubeConfigFile,
"--authorization-kubeconfig", wardleToKASKubeConfigFile,
"--etcd-servers", framework.GetEtcdURL(),
"--cert-dir", wardleCertDir,
"--kubeconfig", kubeconfigFile.Name(),
"--kubeconfig", wardleToKASKubeConfigFile,
})
if err := wardleCmd.Execute(); err != nil {
t.Fatal(err)
}
}()
wardleClientConfig := rest.AnonymousClientConfig(kubeClientConfig)
wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
wardleClientConfig.CAData = nil
wardleClientConfig.ServerName = ""
wardleClientConfig.BearerToken = kubeClientConfig.BearerToken
var wardleClient client.Interface
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", atomic.LoadInt32(wardlePort))
wardleClient, err = client.NewForConfig(wardleClientConfig)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
healthStatus := 0
wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// start the aggregator
aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator")
defer os.RemoveAll(aggregatorCertDir)
proxyClientKey, err := testutil.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxyClientCert, err := testutil.NewSignedCert(
&cert.Config{
CommonName: "kube-aggregator",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
},
proxyClientKey, proxySigningCert, proxySigningKey,
)
proxyClientCertFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.crt")
proxyClientKeyFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.key")
if err := ioutil.WriteFile(proxyClientCertFile.Name(), testutil.EncodeCertPEM(proxyClientCert), 0600); err != nil {
t.Fatal(err)
}
proxyClientKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(proxyClientKey)
if err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(proxyClientKeyFile.Name(), proxyClientKeyPEM, 0644); err != nil {
t.Fatal(err)
}
aggregatorPort := new(int32)
go func() {
listener, port, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
atomic.StoreInt32(aggregatorPort, int32(port))
o := kubeaggregatorserver.NewDefaultOptions(os.Stdout, os.Stderr)
o.RecommendedOptions.SecureServing.Listener = listener
o.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(o, stopCh)
aggregatorCmd.SetArgs([]string{
"--requestheader-username-headers", "",
"--proxy-client-cert-file", proxyClientCertFile.Name(),
"--proxy-client-key-file", proxyClientKeyFile.Name(),
"--kubeconfig", kubeconfigFile.Name(),
"--authentication-kubeconfig", kubeconfigFile.Name(),
"--authorization-kubeconfig", kubeconfigFile.Name(),
"--etcd-servers", framework.GetEtcdURL(),
"--cert-dir", aggregatorCertDir,
})
if err := aggregatorCmd.Execute(); err != nil {
t.Fatal(err)
}
}()
aggregatorClientConfig := rest.AnonymousClientConfig(kubeClientConfig)
aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt")
aggregatorClientConfig.CAData = nil
aggregatorClientConfig.ServerName = ""
aggregatorClientConfig.BearerToken = kubeClientConfig.BearerToken
var aggregatorDiscoveryClient client.Interface
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", atomic.LoadInt32(aggregatorPort))
aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig)
if err != nil {
// this happens if we race the API server for writing the cert
return false, nil
}
healthStatus := 0
aggregatorDiscoveryClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
directWardleClientConfig, err := waitForWardleRunning(t, kubeClientConfig, wardleCertDir, wardlePort)
if err != nil {
t.Fatal(err)
}
// now we're finally ready to test. These are what's run by default now
wardleClient, err := client.NewForConfig(directWardleClientConfig)
if err != nil {
t.Fatal(err)
}
testAPIGroupList(t, wardleClient.Discovery().RESTClient())
testAPIGroup(t, wardleClient.Discovery().RESTClient())
testAPIResourceList(t, wardleClient.Discovery().RESTClient())
wardleCA, err := ioutil.ReadFile(wardleClientConfig.CAFile)
wardleCA, err := ioutil.ReadFile(directWardleClientConfig.CAFile)
if err != nil {
t.Fatal(err)
}
aggregatorClient := aggregatorclient.NewForConfigOrDie(aggregatorClientConfig)
_, err = aggregatorClient.ApiregistrationV1beta1().APIServices().Create(&apiregistrationv1beta1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"},
Spec: apiregistrationv1beta1.APIServiceSpec{
@ -342,39 +130,154 @@ func TestAggregatedAPIServer(t *testing.T) {
// wait for the unavailable API service to be processed with updated status
err = wait.Poll(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
_, err = aggregatorDiscoveryClient.Discovery().ServerResources()
_, err = kubeClient.Discovery().ServerResources()
hasExpectedError := checkWardleUnavailableDiscoveryError(t, err)
return hasExpectedError, nil
})
if err != nil {
t.Fatal(err)
}
// TODO figure out how to turn on enough of services and dns to run more
_, err = aggregatorClient.ApiregistrationV1beta1().APIServices().Create(&apiregistrationv1beta1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistrationv1beta1.APIServiceSpec{
// register this as a local service so it doesn't try to lookup the default kubernetes service
// which will have an unroutable IP address since it's fake.
Group: "",
Version: "v1",
GroupPriorityMinimum: 100,
VersionPriority: 100,
},
})
// Now we want to verify that the client CA bundles properly reflect the values for the cluster-authentication
firstKubeCANames, err := cert.GetClientCANamesForURL(kubeClientConfig.Host)
if err != nil {
t.Fatal(err)
}
// this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery
// (the service is missing), we don't have an external signal.
time.Sleep(100 * time.Millisecond)
_, err = aggregatorDiscoveryClient.Discovery().ServerResources()
hasExpectedError := checkWardleUnavailableDiscoveryError(t, err)
if !hasExpectedError {
t.Fatalf("Discovery call didn't return expected error: %v", err)
t.Log(firstKubeCANames)
firstWardleCANames, err := cert.GetClientCANamesForURL(directWardleClientConfig.Host)
if err != nil {
t.Fatal(err)
}
t.Log(firstWardleCANames)
if !reflect.DeepEqual(firstKubeCANames, firstWardleCANames) {
t.Fatal("names don't match")
}
// TODO figure out how to turn on enough of services and dns to run more
// now we update the client-ca nd request-header-client-ca-file and the kas will consume it, update the configmap
// and then the wardle server will detect and update too.
if err := ioutil.WriteFile(path.Join(testServer.TmpDir, "client-ca.crt"), differentClientCA, 0644); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(path.Join(testServer.TmpDir, "proxy-ca.crt"), differentFrontProxyCA, 0644); err != nil {
t.Fatal(err)
}
// wait for it to be picked up. there's a test in certreload_test.go that ensure this works
time.Sleep(4 * time.Second)
// Now we want to verify that the client CA bundles properly updated to reflect the new values written for the kube-apiserver
secondKubeCANames, err := cert.GetClientCANamesForURL(kubeClientConfig.Host)
if err != nil {
t.Fatal(err)
}
t.Log(secondKubeCANames)
for i := range firstKubeCANames {
if firstKubeCANames[i] == secondKubeCANames[i] {
t.Errorf("ca bundles should change")
}
}
secondWardleCANames, err := cert.GetClientCANamesForURL(directWardleClientConfig.Host)
if err != nil {
t.Fatal(err)
}
t.Log(secondWardleCANames)
// second wardle should contain all the certs, first and last
numMatches := 0
for _, needle := range firstKubeCANames {
for _, haystack := range secondWardleCANames {
if needle == haystack {
numMatches++
break
}
}
}
for _, needle := range secondKubeCANames {
for _, haystack := range secondWardleCANames {
if needle == haystack {
numMatches++
break
}
}
}
if numMatches != 4 {
t.Fatal("names don't match")
}
}
func waitForWardleRunning(t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) {
directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig))
directWardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
directWardleClientConfig.CAData = nil
directWardleClientConfig.ServerName = ""
directWardleClientConfig.BearerToken = wardleToKASKubeConfig.BearerToken
var wardleClient client.Interface
lastHealthContent := []byte{}
var lastHealthErr error
err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if _, err := os.Stat(directWardleClientConfig.CAFile); os.IsNotExist(err) { // wait until the file trust is created
lastHealthErr = err
return false, nil
}
directWardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
wardleClient, err = client.NewForConfig(directWardleClientConfig)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
healthStatus := 0
result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
lastHealthContent, lastHealthErr = result.Raw()
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
if err != nil {
t.Log(string(lastHealthContent))
t.Log(lastHealthErr)
return nil, err
}
return directWardleClientConfig, nil
}
func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string {
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
// the loopback client config uses a loopback cert with different SNI. We need to use the "real"
// cert, so we'll hope we aren't hacked during a unit test and instead load it from the server we started.
wardleToKASKubeClientConfig := rest.CopyConfig(kubeClientConfig)
servingCerts, _, err := cert.GetServingCertificatesForURL(wardleToKASKubeClientConfig.Host, "")
if err != nil {
t.Fatal(err)
}
encodedServing, err := cert.EncodeCertificates(servingCerts...)
if err != nil {
t.Fatal(err)
}
wardleToKASKubeClientConfig.CAData = encodedServing
for _, v := range servingCerts {
t.Logf("Client: Server public key is %v\n", dynamiccertificates.GetHumanCertDetail(v))
}
certs, err := cert.ParseCertsPEM(wardleToKASKubeClientConfig.CAData)
if err != nil {
t.Fatal(err)
}
for _, curr := range certs {
t.Logf("CA bundle %v\n", dynamiccertificates.GetHumanCertDetail(curr))
}
adminKubeConfig := createKubeConfig(wardleToKASKubeClientConfig)
wardleToKASKubeConfigFile, _ := ioutil.TempFile("", "")
if err := clientcmd.WriteToFile(*adminKubeConfig, wardleToKASKubeConfigFile.Name()); err != nil {
t.Fatal(err)
}
return wardleToKASKubeConfigFile.Name()
}
func checkWardleUnavailableDiscoveryError(t *testing.T, err error) bool {
@ -510,3 +413,41 @@ func testAPIResourceList(t *testing.T, client rest.Interface) {
assert.Equal(t, "flunders", apiResourceList.APIResources[1].Name)
assert.True(t, apiResourceList.APIResources[1].Namespaced)
}
var (
// I have no idea what these certs are, they just need to be different
differentClientCA = []byte(`-----BEGIN CERTIFICATE-----
MIIDQDCCAiigAwIBAgIJANWw74P5KJk2MA0GCSqGSIb3DQEBCwUAMDQxMjAwBgNV
BAMMKWdlbmVyaWNfd2ViaG9va19hZG1pc3Npb25fcGx1Z2luX3Rlc3RzX2NhMCAX
DTE3MTExNjAwMDUzOVoYDzIyOTEwOTAxMDAwNTM5WjAjMSEwHwYDVQQDExh3ZWJo
b29rLXRlc3QuZGVmYXVsdC5zdmMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQDXd/nQ89a5H8ifEsigmMd01Ib6NVR3bkJjtkvYnTbdfYEBj7UzqOQtHoLa
dIVmefny5uIHvj93WD8WDVPB3jX2JHrXkDTXd/6o6jIXHcsUfFTVLp6/bZ+Anqe0
r/7hAPkzA2A7APyTWM3ZbEeo1afXogXhOJ1u/wz0DflgcB21gNho4kKTONXO3NHD
XLpspFqSkxfEfKVDJaYAoMnYZJtFNsa2OvsmLnhYF8bjeT3i07lfwrhUZvP+7Gsp
7UgUwc06WuNHjfx1s5e6ySzH0QioMD1rjYneqOvk0pKrMIhuAEWXqq7jlXcDtx1E
j+wnYbVqqVYheHZ8BCJoVAAQGs9/AgMBAAGjZDBiMAkGA1UdEwQCMAAwCwYDVR0P
BAQDAgXgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATApBgNVHREEIjAg
hwR/AAABghh3ZWJob29rLXRlc3QuZGVmYXVsdC5zdmMwDQYJKoZIhvcNAQELBQAD
ggEBAD/GKSPNyQuAOw/jsYZesb+RMedbkzs18sSwlxAJQMUrrXwlVdHrA8q5WhE6
ABLqU1b8lQ8AWun07R8k5tqTmNvCARrAPRUqls/ryER+3Y9YEcxEaTc3jKNZFLbc
T6YtcnkdhxsiO136wtiuatpYL91RgCmuSpR8+7jEHhuFU01iaASu7ypFrUzrKHTF
bKwiLRQi1cMzVcLErq5CDEKiKhUkoDucyARFszrGt9vNIl/YCcBOkcNvM3c05Hn3
M++C29JwS3Hwbubg6WO3wjFjoEhpCwU6qRYUz3MRp4tHO4kxKXx+oQnUiFnR7vW0
YkNtGc1RUDHwecCTFpJtPb7Yu/E=
-----END CERTIFICATE-----
`)
differentFrontProxyCA = []byte(`-----BEGIN CERTIFICATE-----
MIIBqDCCAU2gAwIBAgIUfbqeieihh/oERbfvRm38XvS/xHAwCgYIKoZIzj0EAwIw
GjEYMBYGA1UEAxMPSW50ZXJtZWRpYXRlLUNBMCAXDTE2MTAxMTA1MDYwMFoYDzIx
MTYwOTE3MDUwNjAwWjAUMRIwEAYDVQQDEwlNeSBDbGllbnQwWTATBgcqhkjOPQIB
BggqhkjOPQMBBwNCAARv6N4R/sjMR65iMFGNLN1GC/vd7WhDW6J4X/iAjkRLLnNb
KbRG/AtOUZ+7upJ3BWIRKYbOabbQGQe2BbKFiap4o3UwczAOBgNVHQ8BAf8EBAMC
BaAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNVHQ4EFgQU
K/pZOWpNcYai6eHFpmJEeFpeQlEwHwYDVR0jBBgwFoAUX6nQlxjfWnP6aM1meO/Q
a6b3a9kwCgYIKoZIzj0EAwIDSQAwRgIhAIWTKw/sjJITqeuNzJDAKU4xo1zL+xJ5
MnVCuBwfwDXCAiEAw/1TA+CjPq9JC5ek1ifR0FybTURjeQqYkKpve1dveps=
-----END CERTIFICATE-----
`)
)

1
vendor/modules.txt vendored
View File

@ -1709,7 +1709,6 @@ k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1b
k8s.io/kube-aggregator/pkg/client/informers/externalversions/internalinterfaces
k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1
k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1beta1
k8s.io/kube-aggregator/pkg/cmd/server
k8s.io/kube-aggregator/pkg/controllers
k8s.io/kube-aggregator/pkg/controllers/autoregister
k8s.io/kube-aggregator/pkg/controllers/openapi