plumb the proxyTransport to the webhook admission plugin;

set the ServerName in the config for webhook admission plugin.
This commit is contained in:
Chao Xu 2017-08-10 11:25:41 -07:00
parent 5d995e3f7b
commit 186a0684d5
7 changed files with 62 additions and 15 deletions

View File

@ -251,7 +251,7 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra
} }
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport *http.Transport) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
// set defaults in the options before trying to create the generic config // set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil { if err := defaultOptions(s); err != nil {
return nil, nil, nil, nil, nil, err return nil, nil, nil, nil, nil, err
@ -269,8 +269,7 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
return nil, nil, nil, nil, nil, err return nil, nil, nil, nil, nil, err
} }
} }
genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s, proxyTransport)
genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s)
if err != nil { if err != nil {
return nil, nil, nil, nil, nil, err return nil, nil, nil, nil, nil, err
} }
@ -354,7 +353,7 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
} }
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { func BuildGenericConfig(s *options.ServerRunOptions, proxyTransport *http.Transport) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
genericConfig := genericapiserver.NewConfig(api.Codecs) genericConfig := genericapiserver.NewConfig(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil { if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, nil, nil, err return nil, nil, nil, nil, nil, err
@ -460,6 +459,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
sharedInformers, sharedInformers,
genericConfig.Authorizer, genericConfig.Authorizer,
serviceResolver, serviceResolver,
proxyTransport,
) )
if err != nil { if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err) return nil, nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
@ -476,7 +476,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
} }
// BuildAdmissionPluginInitializer constructs the admission plugin initializer // BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer, serviceResolver aggregatorapiserver.ServiceResolver) (admission.PluginInitializer, error) { func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer, serviceResolver aggregatorapiserver.ServiceResolver, proxyTransport *http.Transport) (admission.PluginInitializer, error) {
var cloudConfig []byte var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" { if s.CloudProvider.CloudConfigFile != "" {
@ -510,6 +510,7 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna
} }
pluginInitializer = pluginInitializer.SetServiceResolver(serviceResolver) pluginInitializer = pluginInitializer.SetServiceResolver(serviceResolver)
pluginInitializer = pluginInitializer.SetProxyTransport(proxyTransport)
return pluginInitializer, nil return pluginInitializer, nil
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package admission package admission
import ( import (
"net/http"
"net/url" "net/url"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
@ -88,6 +89,12 @@ type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error) ResolveEndpoint(namespace, name string) (*url.URL, error)
} }
// WantsProxyTransport defines a fuction that accepts a proxy transport for admission
// plugins that need to make calls to pods.
type WantsProxyTransport interface {
SetProxyTransport(proxyTransport *http.Transport)
}
type PluginInitializer struct { type PluginInitializer struct {
internalClient internalclientset.Interface internalClient internalclientset.Interface
externalClient clientset.Interface externalClient clientset.Interface
@ -101,6 +108,7 @@ type PluginInitializer struct {
// for proving we are apiserver in call-outs // for proving we are apiserver in call-outs
clientCert []byte clientCert []byte
clientKey []byte clientKey []byte
proxyTransport *http.Transport
} }
var _ admission.PluginInitializer = &PluginInitializer{} var _ admission.PluginInitializer = &PluginInitializer{}
@ -142,6 +150,12 @@ func (i *PluginInitializer) SetClientCert(cert, key []byte) *PluginInitializer {
return i return i
} }
// SetProxyTransport sets the proxyTransport which is needed by some plugins.
func (i *PluginInitializer) SetProxyTransport(proxyTransport *http.Transport) *PluginInitializer {
i.proxyTransport = proxyTransport
return i
}
// Initialize checks the initialization interfaces implemented by each plugin // Initialize checks the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data // and provide the appropriate initialization data
func (i *PluginInitializer) Initialize(plugin admission.Interface) { func (i *PluginInitializer) Initialize(plugin admission.Interface) {
@ -186,4 +200,8 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) {
} }
wants.SetClientCert(i.clientCert, i.clientKey) wants.SetClientCert(i.clientCert, i.clientKey)
} }
if wants, ok := plugin.(WantsProxyTransport); ok {
wants.SetProxyTransport(i.proxyTransport)
}
} }

View File

@ -21,6 +21,8 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net"
"net/http"
"sync" "sync"
"time" "time"
@ -106,6 +108,7 @@ type GenericAdmissionWebhook struct {
negotiatedSerializer runtime.NegotiatedSerializer negotiatedSerializer runtime.NegotiatedSerializer
clientCert []byte clientCert []byte
clientKey []byte clientKey []byte
proxyTransport *http.Transport
} }
var ( var (
@ -114,6 +117,10 @@ var (
_ = admissioninit.WantsExternalKubeClientSet(&GenericAdmissionWebhook{}) _ = admissioninit.WantsExternalKubeClientSet(&GenericAdmissionWebhook{})
) )
func (a *GenericAdmissionWebhook) SetProxyTransport(pt *http.Transport) {
a.proxyTransport = pt
}
func (a *GenericAdmissionWebhook) SetServiceResolver(sr admissioninit.ServiceResolver) { func (a *GenericAdmissionWebhook) SetServiceResolver(sr admissioninit.ServiceResolver) {
a.serviceResolver = sr a.serviceResolver = sr
} }
@ -242,11 +249,17 @@ func (a *GenericAdmissionWebhook) hookClient(h *v1alpha1.ExternalAdmissionHook)
return nil, err return nil, err
} }
var dial func(network, addr string) (net.Conn, error)
if a.proxyTransport != nil && a.proxyTransport.Dial != nil {
dial = a.proxyTransport.Dial
}
// TODO: cache these instead of constructing one each time // TODO: cache these instead of constructing one each time
cfg := &rest.Config{ cfg := &rest.Config{
Host: u.Host, Host: u.Host,
APIPath: u.Path, APIPath: u.Path,
TLSClientConfig: rest.TLSClientConfig{ TLSClientConfig: rest.TLSClientConfig{
ServerName: h.ClientConfig.Service.Name + "." + h.ClientConfig.Service.Namespace + ".svc",
CAData: h.ClientConfig.CABundle, CAData: h.ClientConfig.CABundle,
CertData: a.clientCert, CertData: a.clientCert,
KeyData: a.clientKey, KeyData: a.clientKey,
@ -256,6 +269,7 @@ func (a *GenericAdmissionWebhook) hookClient(h *v1alpha1.ExternalAdmissionHook)
ContentConfig: rest.ContentConfig{ ContentConfig: rest.ContentConfig{
NegotiatedSerializer: a.negotiatedSerializer, NegotiatedSerializer: a.negotiatedSerializer,
}, },
Dial: dial,
} }
return rest.UnversionedRESTClientFor(cfg) return rest.UnversionedRESTClientFor(cfg)
} }

View File

@ -114,6 +114,9 @@ type Config struct {
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout. // The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration Timeout time.Duration
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(network, addr string) (net.Conn, error)
// Version forces a specific version to be used (if registered) // Version forces a specific version to be used (if registered)
// Do we need this? // Do we need this?
// Version string // Version string

View File

@ -96,5 +96,6 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
Groups: c.Impersonate.Groups, Groups: c.Impersonate.Groups,
Extra: c.Impersonate.Extra, Extra: c.Impersonate.Extra,
}, },
Dial: c.Dial,
}, nil }, nil
} }

View File

@ -63,16 +63,20 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
return http.DefaultTransport, nil return http.DefaultTransport, nil
} }
dial := config.Dial
if dial == nil {
dial = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial
}
// Cache a single transport for these options // Cache a single transport for these options
c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{ c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
MaxIdleConnsPerHost: idleConnsPerHost, MaxIdleConnsPerHost: idleConnsPerHost,
Dial: (&net.Dialer{ Dial: dial,
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
}) })
return c.transports[key], nil return c.transports[key], nil
} }

View File

@ -16,7 +16,10 @@ limitations under the License.
package transport package transport
import "net/http" import (
"net"
"net/http"
)
// Config holds various options for establishing a transport. // Config holds various options for establishing a transport.
type Config struct { type Config struct {
@ -52,6 +55,9 @@ type Config struct {
// config may layer other RoundTrippers on top of the returned // config may layer other RoundTrippers on top of the returned
// RoundTripper. // RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper WrapTransport func(rt http.RoundTripper) http.RoundTripper
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(network, addr string) (net.Conn, error)
} }
// ImpersonationConfig has all the available impersonation options // ImpersonationConfig has all the available impersonation options