mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 11:38:15 +00:00
Merge pull request #117258 from enj/enj/i/aggregator_transport_cache
kube-aggregator: correctly use client-go TLS cache with custom dialer
This commit is contained in:
commit
bcacf92741
@ -35,6 +35,7 @@ import (
|
|||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/pkg/version"
|
"k8s.io/client-go/pkg/version"
|
||||||
|
"k8s.io/client-go/transport"
|
||||||
openapicommon "k8s.io/kube-openapi/pkg/common"
|
openapicommon "k8s.io/kube-openapi/pkg/common"
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||||
@ -129,7 +130,7 @@ type APIAggregator struct {
|
|||||||
|
|
||||||
// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
|
// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
|
||||||
proxyCurrentCertKeyContent certKeyFunc
|
proxyCurrentCertKeyContent certKeyFunc
|
||||||
proxyTransport *http.Transport
|
proxyTransportDial *transport.DialHolder
|
||||||
|
|
||||||
// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
|
// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
|
||||||
proxyHandlers map[string]*proxyHandler
|
proxyHandlers map[string]*proxyHandler
|
||||||
@ -160,10 +161,6 @@ type APIAggregator struct {
|
|||||||
// when discovery with resources are requested
|
// when discovery with resources are requested
|
||||||
discoveryAggregationController DiscoveryAggregationController
|
discoveryAggregationController DiscoveryAggregationController
|
||||||
|
|
||||||
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
|
|
||||||
// overwrites proxyTransport dialer if not nil
|
|
||||||
egressSelector *egressselector.EgressSelector
|
|
||||||
|
|
||||||
// rejectForwardingRedirects is whether to allow to forward redirect response
|
// rejectForwardingRedirects is whether to allow to forward redirect response
|
||||||
rejectForwardingRedirects bool
|
rejectForwardingRedirects bool
|
||||||
}
|
}
|
||||||
@ -210,10 +207,23 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var proxyTransportDial *transport.DialHolder
|
||||||
|
if c.GenericConfig.EgressSelector != nil {
|
||||||
|
egressDialer, err := c.GenericConfig.EgressSelector.Lookup(egressselector.Cluster.AsNetworkContext())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if egressDialer != nil {
|
||||||
|
proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
|
||||||
|
}
|
||||||
|
} else if c.ExtraConfig.ProxyTransport != nil && c.ExtraConfig.ProxyTransport.DialContext != nil {
|
||||||
|
proxyTransportDial = &transport.DialHolder{Dial: c.ExtraConfig.ProxyTransport.DialContext}
|
||||||
|
}
|
||||||
|
|
||||||
s := &APIAggregator{
|
s := &APIAggregator{
|
||||||
GenericAPIServer: genericServer,
|
GenericAPIServer: genericServer,
|
||||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||||
proxyTransport: c.ExtraConfig.ProxyTransport,
|
proxyTransportDial: proxyTransportDial,
|
||||||
proxyHandlers: map[string]*proxyHandler{},
|
proxyHandlers: map[string]*proxyHandler{},
|
||||||
handledGroups: sets.String{},
|
handledGroups: sets.String{},
|
||||||
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
|
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
|
||||||
@ -221,7 +231,6 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
serviceResolver: c.ExtraConfig.ServiceResolver,
|
serviceResolver: c.ExtraConfig.ServiceResolver,
|
||||||
openAPIConfig: c.GenericConfig.OpenAPIConfig,
|
openAPIConfig: c.GenericConfig.OpenAPIConfig,
|
||||||
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
|
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
|
||||||
egressSelector: c.GenericConfig.EgressSelector,
|
|
||||||
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
|
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
|
||||||
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
|
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
|
||||||
}
|
}
|
||||||
@ -295,10 +304,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
|
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
|
||||||
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
|
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
|
||||||
apiregistrationClient.ApiregistrationV1(),
|
apiregistrationClient.ApiregistrationV1(),
|
||||||
c.ExtraConfig.ProxyTransport,
|
proxyTransportDial,
|
||||||
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
|
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
|
||||||
s.serviceResolver,
|
s.serviceResolver,
|
||||||
c.GenericConfig.EgressSelector,
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -463,7 +471,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
|
|||||||
// Forward calls to discovery manager to update discovery document
|
// Forward calls to discovery manager to update discovery document
|
||||||
if s.discoveryAggregationController != nil {
|
if s.discoveryAggregationController != nil {
|
||||||
handlerCopy := *proxyHandler
|
handlerCopy := *proxyHandler
|
||||||
handlerCopy.setServiceAvailable(true)
|
handlerCopy.setServiceAvailable()
|
||||||
s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
|
s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -479,9 +487,8 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
|
|||||||
proxyHandler := &proxyHandler{
|
proxyHandler := &proxyHandler{
|
||||||
localDelegate: s.delegateHandler,
|
localDelegate: s.delegateHandler,
|
||||||
proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
|
proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
|
||||||
proxyTransport: s.proxyTransport,
|
proxyTransportDial: s.proxyTransportDial,
|
||||||
serviceResolver: s.serviceResolver,
|
serviceResolver: s.serviceResolver,
|
||||||
egressSelector: s.egressSelector,
|
|
||||||
rejectForwardingRedirects: s.rejectForwardingRedirects,
|
rejectForwardingRedirects: s.rejectForwardingRedirects,
|
||||||
}
|
}
|
||||||
proxyHandler.updateAPIService(apiService)
|
proxyHandler.updateAPIService(apiService)
|
||||||
|
@ -33,10 +33,8 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/server/egressselector"
|
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
"k8s.io/apiserver/pkg/util/x509metrics"
|
"k8s.io/apiserver/pkg/util/x509metrics"
|
||||||
restclient "k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/transport"
|
"k8s.io/client-go/transport"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||||
@ -59,17 +57,13 @@ type proxyHandler struct {
|
|||||||
|
|
||||||
// proxyCurrentCertKeyContent holds the client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
|
// proxyCurrentCertKeyContent holds the client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
|
||||||
proxyCurrentCertKeyContent certKeyFunc
|
proxyCurrentCertKeyContent certKeyFunc
|
||||||
proxyTransport *http.Transport
|
proxyTransportDial *transport.DialHolder
|
||||||
|
|
||||||
// Endpoints based routing to map from cluster IP to routable IP
|
// Endpoints based routing to map from cluster IP to routable IP
|
||||||
serviceResolver ServiceResolver
|
serviceResolver ServiceResolver
|
||||||
|
|
||||||
handlingInfo atomic.Value
|
handlingInfo atomic.Value
|
||||||
|
|
||||||
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
|
|
||||||
// overwrites proxyTransport dialer if not nil
|
|
||||||
egressSelector *egressselector.EgressSelector
|
|
||||||
|
|
||||||
// reject to forward redirect response
|
// reject to forward redirect response
|
||||||
rejectForwardingRedirects bool
|
rejectForwardingRedirects bool
|
||||||
}
|
}
|
||||||
@ -80,8 +74,8 @@ type proxyHandlingInfo struct {
|
|||||||
|
|
||||||
// name is the name of the APIService
|
// name is the name of the APIService
|
||||||
name string
|
name string
|
||||||
// restConfig holds the information for building a roundtripper
|
// transportConfig holds the information for building a roundtripper
|
||||||
restConfig *restclient.Config
|
transportConfig *transport.Config
|
||||||
// transportBuildingError is an error produced while building the transport. If this
|
// transportBuildingError is an error produced while building the transport. If this
|
||||||
// is non-nil, it will be reported to clients.
|
// is non-nil, it will be reported to clients.
|
||||||
transportBuildingError error
|
transportBuildingError error
|
||||||
@ -233,7 +227,7 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {
|
|||||||
|
|
||||||
// Sets serviceAvailable value on proxyHandler
|
// Sets serviceAvailable value on proxyHandler
|
||||||
// not thread safe
|
// not thread safe
|
||||||
func (r *proxyHandler) setServiceAvailable(value bool) {
|
func (r *proxyHandler) setServiceAvailable() {
|
||||||
info := r.handlingInfo.Load().(proxyHandlingInfo)
|
info := r.handlingInfo.Load().(proxyHandlingInfo)
|
||||||
info.serviceAvailable = true
|
info.serviceAvailable = true
|
||||||
r.handlingInfo.Store(info)
|
r.handlingInfo.Store(info)
|
||||||
@ -247,41 +241,30 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIServ
|
|||||||
|
|
||||||
proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()
|
proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()
|
||||||
|
|
||||||
clientConfig := &restclient.Config{
|
transportConfig := &transport.Config{
|
||||||
TLSClientConfig: restclient.TLSClientConfig{
|
TLS: transport.TLSConfig{
|
||||||
Insecure: apiService.Spec.InsecureSkipTLSVerify,
|
Insecure: apiService.Spec.InsecureSkipTLSVerify,
|
||||||
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
|
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
|
||||||
CertData: proxyClientCert,
|
CertData: proxyClientCert,
|
||||||
KeyData: proxyClientKey,
|
KeyData: proxyClientKey,
|
||||||
CAData: apiService.Spec.CABundle,
|
CAData: apiService.Spec.CABundle,
|
||||||
},
|
},
|
||||||
|
DialHolder: r.proxyTransportDial,
|
||||||
}
|
}
|
||||||
clientConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
|
transportConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
|
||||||
x509MissingSANCounter,
|
x509MissingSANCounter,
|
||||||
x509InsecureSHA1Counter,
|
x509InsecureSHA1Counter,
|
||||||
))
|
))
|
||||||
|
|
||||||
newInfo := proxyHandlingInfo{
|
newInfo := proxyHandlingInfo{
|
||||||
name: apiService.Name,
|
name: apiService.Name,
|
||||||
restConfig: clientConfig,
|
transportConfig: transportConfig,
|
||||||
serviceName: apiService.Spec.Service.Name,
|
serviceName: apiService.Spec.Service.Name,
|
||||||
serviceNamespace: apiService.Spec.Service.Namespace,
|
serviceNamespace: apiService.Spec.Service.Namespace,
|
||||||
servicePort: *apiService.Spec.Service.Port,
|
servicePort: *apiService.Spec.Service.Port,
|
||||||
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
|
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
|
||||||
}
|
}
|
||||||
if r.egressSelector != nil {
|
newInfo.proxyRoundTripper, newInfo.transportBuildingError = transport.New(newInfo.transportConfig)
|
||||||
networkContext := egressselector.Cluster.AsNetworkContext()
|
|
||||||
var egressDialer utilnet.DialFunc
|
|
||||||
egressDialer, err := r.egressSelector.Lookup(networkContext)
|
|
||||||
if err != nil {
|
|
||||||
klog.Warning(err.Error())
|
|
||||||
} else {
|
|
||||||
newInfo.restConfig.Dial = egressDialer
|
|
||||||
}
|
|
||||||
} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
|
|
||||||
newInfo.restConfig.Dial = r.proxyTransport.DialContext
|
|
||||||
}
|
|
||||||
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
|
|
||||||
if newInfo.transportBuildingError != nil {
|
if newInfo.transportBuildingError != nil {
|
||||||
klog.Warning(newInfo.transportBuildingError.Error())
|
klog.Warning(newInfo.transportBuildingError.Error())
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||||
|
"k8s.io/client-go/transport"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
|
|
||||||
@ -325,7 +326,6 @@ func TestProxyHandler(t *testing.T) {
|
|||||||
handler := &proxyHandler{
|
handler := &proxyHandler{
|
||||||
localDelegate: http.NewServeMux(),
|
localDelegate: http.NewServeMux(),
|
||||||
serviceResolver: serviceResolver,
|
serviceResolver: serviceResolver,
|
||||||
proxyTransport: &http.Transport{},
|
|
||||||
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
||||||
}
|
}
|
||||||
server := httptest.NewServer(contextHandler(handler, tc.user))
|
server := httptest.NewServer(contextHandler(handler, tc.user))
|
||||||
@ -551,7 +551,6 @@ func TestProxyUpgrade(t *testing.T) {
|
|||||||
serverURL, _ := url.Parse(backendServer.URL)
|
serverURL, _ := url.Parse(backendServer.URL)
|
||||||
proxyHandler := &proxyHandler{
|
proxyHandler := &proxyHandler{
|
||||||
serviceResolver: &mockedRouter{destinationHost: serverURL.Host},
|
serviceResolver: &mockedRouter{destinationHost: serverURL.Host},
|
||||||
proxyTransport: &http.Transport{},
|
|
||||||
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -559,7 +558,14 @@ func TestProxyUpgrade(t *testing.T) {
|
|||||||
var selector *egressselector.EgressSelector
|
var selector *egressselector.EgressSelector
|
||||||
if tc.NewEgressSelector != nil {
|
if tc.NewEgressSelector != nil {
|
||||||
dialer, selector = tc.NewEgressSelector()
|
dialer, selector = tc.NewEgressSelector()
|
||||||
proxyHandler.egressSelector = selector
|
|
||||||
|
egressDialer, err := selector.Lookup(egressselector.Cluster.AsNetworkContext())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if egressDialer != nil {
|
||||||
|
proxyHandler.proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyHandler.updateAPIService(tc.APIService)
|
proxyHandler.updateAPIService(tc.APIService)
|
||||||
|
@ -19,7 +19,6 @@ package apiserver
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -33,13 +32,10 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/server/egressselector"
|
|
||||||
v1informers "k8s.io/client-go/informers/core/v1"
|
v1informers "k8s.io/client-go/informers/core/v1"
|
||||||
v1listers "k8s.io/client-go/listers/core/v1"
|
v1listers "k8s.io/client-go/listers/core/v1"
|
||||||
"k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/transport"
|
"k8s.io/client-go/transport"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
@ -77,8 +73,8 @@ type AvailableConditionController struct {
|
|||||||
endpointsLister v1listers.EndpointsLister
|
endpointsLister v1listers.EndpointsLister
|
||||||
endpointsSynced cache.InformerSynced
|
endpointsSynced cache.InformerSynced
|
||||||
|
|
||||||
// dialContext specifies the dial function for creating unencrypted TCP connections.
|
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
|
||||||
dialContext func(ctx context.Context, network, address string) (net.Conn, error)
|
proxyTransportDial *transport.DialHolder
|
||||||
proxyCurrentCertKeyContent certKeyFunc
|
proxyCurrentCertKeyContent certKeyFunc
|
||||||
serviceResolver ServiceResolver
|
serviceResolver ServiceResolver
|
||||||
|
|
||||||
@ -91,67 +87,19 @@ type AvailableConditionController struct {
|
|||||||
// this lock protects operations on the above cache
|
// this lock protects operations on the above cache
|
||||||
cacheLock sync.RWMutex
|
cacheLock sync.RWMutex
|
||||||
|
|
||||||
// TLS config with customized dialer cannot be cached by the client-go
|
|
||||||
// tlsTransportCache. Use a local cache here to reduce the chance of
|
|
||||||
// the controller spamming idle connections with short-lived transports.
|
|
||||||
// NOTE: the cache works because we assume that the transports constructed
|
|
||||||
// by the controller only vary on the dynamic cert/key.
|
|
||||||
tlsCache *tlsTransportCache
|
|
||||||
|
|
||||||
// metrics registered into legacy registry
|
// metrics registered into legacy registry
|
||||||
metrics *availabilityMetrics
|
metrics *availabilityMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
type tlsTransportCache struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
transports map[tlsCacheKey]http.RoundTripper
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) {
|
|
||||||
// If the available controller doesn't customzie the dialer (and we know from
|
|
||||||
// the code that the controller doesn't customzie other functions i.e. Proxy
|
|
||||||
// and GetCert (ExecProvider)), the config is cacheable by the client-go TLS
|
|
||||||
// transport cache. Let's skip the local cache and depend on the client-go cache.
|
|
||||||
if config.Dial == nil {
|
|
||||||
return rest.TransportFor(config)
|
|
||||||
}
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
// See if we already have a custom transport for this config
|
|
||||||
key := tlsConfigKey(config)
|
|
||||||
if t, ok := c.transports[key]; ok {
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
restTransport, err := rest.TransportFor(config)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.transports[key] = restTransport
|
|
||||||
return restTransport, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type tlsCacheKey struct {
|
|
||||||
certData string
|
|
||||||
keyData string `datapolicy:"secret-key"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func tlsConfigKey(c *rest.Config) tlsCacheKey {
|
|
||||||
return tlsCacheKey{
|
|
||||||
certData: string(c.TLSClientConfig.CertData),
|
|
||||||
keyData: string(c.TLSClientConfig.KeyData),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewAvailableConditionController returns a new AvailableConditionController.
|
// NewAvailableConditionController returns a new AvailableConditionController.
|
||||||
func NewAvailableConditionController(
|
func NewAvailableConditionController(
|
||||||
apiServiceInformer informers.APIServiceInformer,
|
apiServiceInformer informers.APIServiceInformer,
|
||||||
serviceInformer v1informers.ServiceInformer,
|
serviceInformer v1informers.ServiceInformer,
|
||||||
endpointsInformer v1informers.EndpointsInformer,
|
endpointsInformer v1informers.EndpointsInformer,
|
||||||
apiServiceClient apiregistrationclient.APIServicesGetter,
|
apiServiceClient apiregistrationclient.APIServicesGetter,
|
||||||
proxyTransport *http.Transport,
|
proxyTransportDial *transport.DialHolder,
|
||||||
proxyCurrentCertKeyContent certKeyFunc,
|
proxyCurrentCertKeyContent certKeyFunc,
|
||||||
serviceResolver ServiceResolver,
|
serviceResolver ServiceResolver,
|
||||||
egressSelector *egressselector.EgressSelector,
|
|
||||||
) (*AvailableConditionController, error) {
|
) (*AvailableConditionController, error) {
|
||||||
c := &AvailableConditionController{
|
c := &AvailableConditionController{
|
||||||
apiServiceClient: apiServiceClient,
|
apiServiceClient: apiServiceClient,
|
||||||
@ -165,23 +113,11 @@ func NewAvailableConditionController(
|
|||||||
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
||||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
||||||
"AvailableConditionController"),
|
"AvailableConditionController"),
|
||||||
|
proxyTransportDial: proxyTransportDial,
|
||||||
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
|
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
|
||||||
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
|
|
||||||
metrics: newAvailabilityMetrics(),
|
metrics: newAvailabilityMetrics(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if egressSelector != nil {
|
|
||||||
networkContext := egressselector.Cluster.AsNetworkContext()
|
|
||||||
var egressDialer utilnet.DialFunc
|
|
||||||
egressDialer, err := egressSelector.Lookup(networkContext)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.dialContext = egressDialer
|
|
||||||
} else if proxyTransport != nil && proxyTransport.DialContext != nil {
|
|
||||||
c.dialContext = proxyTransport.DialContext
|
|
||||||
}
|
|
||||||
|
|
||||||
// resync on this one because it is low cardinality and rechecking the actual discovery
|
// resync on this one because it is low cardinality and rechecking the actual discovery
|
||||||
// allows us to detect health in a more timely fashion when network connectivity to
|
// allows us to detect health in a more timely fashion when network connectivity to
|
||||||
// nodes is snipped, but the network still attempts to route there. See
|
// nodes is snipped, but the network still attempts to route there. See
|
||||||
@ -236,27 +172,20 @@ func (c *AvailableConditionController) sync(key string) error {
|
|||||||
// if a particular transport was specified, use that otherwise build one
|
// if a particular transport was specified, use that otherwise build one
|
||||||
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
|
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
|
||||||
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
|
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
|
||||||
restConfig := &rest.Config{
|
transportConfig := &transport.Config{
|
||||||
TLSClientConfig: rest.TLSClientConfig{
|
TLS: transport.TLSConfig{
|
||||||
Insecure: true,
|
Insecure: true,
|
||||||
},
|
},
|
||||||
|
DialHolder: c.proxyTransportDial,
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.proxyCurrentCertKeyContent != nil {
|
if c.proxyCurrentCertKeyContent != nil {
|
||||||
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()
|
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()
|
||||||
|
|
||||||
restConfig.TLSClientConfig.CertData = proxyClientCert
|
transportConfig.TLS.CertData = proxyClientCert
|
||||||
restConfig.TLSClientConfig.KeyData = proxyClientKey
|
transportConfig.TLS.KeyData = proxyClientKey
|
||||||
}
|
}
|
||||||
if c.dialContext != nil {
|
restTransport, err := transport.New(transportConfig)
|
||||||
restConfig.Dial = c.dialContext
|
|
||||||
}
|
|
||||||
// TLS config with customized dialer cannot be cached by the client-go
|
|
||||||
// tlsTransportCache. Use a local cache here to reduce the chance of
|
|
||||||
// the controller spamming idle connections with short-lived transports.
|
|
||||||
// NOTE: the cache works because we assume that the transports constructed
|
|
||||||
// by the controller only vary on the dynamic cert/key.
|
|
||||||
restTransport, err := c.tlsCache.get(restConfig)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ package apiserver
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -134,8 +133,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
|
|||||||
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
||||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
||||||
"AvailableConditionController"),
|
"AvailableConditionController"),
|
||||||
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
|
metrics: newAvailabilityMetrics(),
|
||||||
metrics: newAvailabilityMetrics(),
|
|
||||||
}
|
}
|
||||||
for _, svc := range apiServices {
|
for _, svc := range apiServices {
|
||||||
c.addAPIService(svc)
|
c.addAPIService(svc)
|
||||||
@ -206,54 +204,6 @@ func TestBuildCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTLSCache(t *testing.T) {
|
|
||||||
apiServices := []*apiregistration.APIService{newRemoteAPIService("remote.group")}
|
|
||||||
services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}
|
|
||||||
c, _ := setupAPIServices(apiServices)
|
|
||||||
// TLS configs with customized dialers are uncacheable by the client-go
|
|
||||||
// TLS transport cache. The local cache will be used.
|
|
||||||
c.dialContext = (&net.Dialer{
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
KeepAlive: 30 * time.Second,
|
|
||||||
}).DialContext
|
|
||||||
for _, svc := range services {
|
|
||||||
c.addService(svc)
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
proxyCurrentCertKeyContent certKeyFunc
|
|
||||||
expectedCacheSize int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "nil certKeyFunc",
|
|
||||||
expectedCacheSize: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "empty certKeyFunc",
|
|
||||||
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
|
||||||
// the tlsCacheKey is the same, reuse existing transport
|
|
||||||
expectedCacheSize: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "different certKeyFunc",
|
|
||||||
proxyCurrentCertKeyContent: testCertKeyFunc,
|
|
||||||
// the tlsCacheKey is different, create a new transport
|
|
||||||
expectedCacheSize: 2,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tc := range tests {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
c.proxyCurrentCertKeyContent = tc.proxyCurrentCertKeyContent
|
|
||||||
for _, apiService := range apiServices {
|
|
||||||
c.sync(apiService.Name)
|
|
||||||
}
|
|
||||||
if len(c.tlsCache.transports) != tc.expectedCacheSize {
|
|
||||||
t.Fatalf("%v cache size expected %v, got %v", tc.name, tc.expectedCacheSize, len(c.tlsCache.transports))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSync(t *testing.T) {
|
func TestSync(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -445,7 +395,6 @@ func TestSync(t *testing.T) {
|
|||||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||||
serviceResolver: &fakeServiceResolver{url: testServer.URL},
|
serviceResolver: &fakeServiceResolver{url: testServer.URL},
|
||||||
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
||||||
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
|
|
||||||
metrics: newAvailabilityMetrics(),
|
metrics: newAvailabilityMetrics(),
|
||||||
}
|
}
|
||||||
c.sync(tc.apiServiceName)
|
c.sync(tc.apiServiceName)
|
||||||
@ -516,33 +465,3 @@ func TestUpdateAPIServiceStatus(t *testing.T) {
|
|||||||
func emptyCert() []byte {
|
func emptyCert() []byte {
|
||||||
return []byte{}
|
return []byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCertKeyFunc() ([]byte, []byte) {
|
|
||||||
return []byte(`-----BEGIN CERTIFICATE-----
|
|
||||||
MIICBDCCAW2gAwIBAgIJAPgVBh+4xbGoMA0GCSqGSIb3DQEBCwUAMBsxGTAXBgNV
|
|
||||||
BAMMEHdlYmhvb2tfdGVzdHNfY2EwIBcNMTcwNzI4MjMxNTI4WhgPMjI5MTA1MTMy
|
|
||||||
MzE1MjhaMB8xHTAbBgNVBAMMFHdlYmhvb2tfdGVzdHNfY2xpZW50MIGfMA0GCSqG
|
|
||||||
SIb3DQEBAQUAA4GNADCBiQKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0
|
|
||||||
rmPa674s2pfYo3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGu
|
|
||||||
uFNhRBvj2S0sIff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4b
|
|
||||||
a44x/wIDAQABo0owSDAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUEFjAU
|
|
||||||
BggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0RBAgwBocEfwAAATANBgkqhkiG9w0B
|
|
||||||
AQsFAAOBgQCpN27uh/LjUVCaBK7Noko25iih/JSSoWzlvc8CaipvSPofNWyGx3Vu
|
|
||||||
OdcSwNGYX/pp4ZoAzFij/Y5u0vKTVLkWXATeTMVmlPvhmpYjj9gPkCSY6j/SiKlY
|
|
||||||
kGy0xr+0M5UQkMBcfIh9oAp9um1fZHVWAJAGP/ikZgkcUey0LmBn8w==
|
|
||||||
-----END CERTIFICATE-----`), []byte(`-----BEGIN RSA PRIVATE KEY-----
|
|
||||||
MIICWwIBAAKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0rmPa674s2pfY
|
|
||||||
o3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGuuFNhRBvj2S0s
|
|
||||||
Iff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4ba44x/wIDAQAB
|
|
||||||
AoGAZbWwowvCq1GBq4vPPRI3h739Uz0bRl1ymf1woYXNguXRtCB4yyH+2BTmmrrF
|
|
||||||
6AIWkePuUEdbUaKyK5nGu3iOWM+/i6NP3kopQANtbAYJ2ray3kwvFlhqyn1bxX4n
|
|
||||||
gl/Cbdw1If4zrDrB66y8mYDsjzK7n/gFaDNcY4GArjvOXKkCQQD9Lgv+WD73y4RP
|
|
||||||
yS+cRarlEeLLWVsX/pg2oEBLM50jsdUnrLSW071MjBgP37oOXzqynF9SoDbP2Y5C
|
|
||||||
x+aGux9LAkEA5qPlQPv0cv8Wc3qTI+LixZ/86PPHKWnOnwaHm3b9vQjZAkuVQg3n
|
|
||||||
Wgg9YDmPM87t3UFH7ZbDihUreUxwr9ZjnQJAZ9Z95shMsxbOYmbSVxafu6m1Sc+R
|
|
||||||
M+sghK7/D5jQpzYlhUspGf8n0YBX0hLhXUmjamQGGH5LXL4Owcb4/mM6twJAEVio
|
|
||||||
SF/qva9jv+GrKVrKFXT374lOJFY53Qn/rvifEtWUhLCslCA5kzLlctRBafMZPrfH
|
|
||||||
Mh5RrJP1BhVysDbenQJASGcc+DiF7rB6K++ZGyC11E2AP29DcZ0pgPESSV7npOGg
|
|
||||||
+NqPRZNVCSZOiVmNuejZqmwKhZNGZnBFx1Y+ChAAgw==
|
|
||||||
-----END RSA PRIVATE KEY-----`)
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user