kube-aggregator: correctly use client-go TLS cache with custom dialer

Signed-off-by: Monis Khan <mok@microsoft.com>
This commit is contained in:
Monis Khan 2023-04-12 17:00:13 -04:00
parent 9082903f94
commit 804d9f47c9
No known key found for this signature in database
5 changed files with 49 additions and 205 deletions

View File

@ -35,6 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/transport"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
@ -129,7 +130,7 @@ type APIAggregator struct {
// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
proxyCurrentCertKeyContent certKeyFunc
proxyTransport *http.Transport
proxyTransportDial *transport.DialHolder
// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
proxyHandlers map[string]*proxyHandler
@ -160,10 +161,6 @@ type APIAggregator struct {
// when discovery with resources are requested
discoveryAggregationController DiscoveryAggregationController
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
// rejectForwardingRedirects is whether to allow to forward redirect response
rejectForwardingRedirects bool
}
@ -210,10 +207,23 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil, err
}
var proxyTransportDial *transport.DialHolder
if c.GenericConfig.EgressSelector != nil {
egressDialer, err := c.GenericConfig.EgressSelector.Lookup(egressselector.Cluster.AsNetworkContext())
if err != nil {
return nil, err
}
if egressDialer != nil {
proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
}
} else if c.ExtraConfig.ProxyTransport != nil && c.ExtraConfig.ProxyTransport.DialContext != nil {
proxyTransportDial = &transport.DialHolder{Dial: c.ExtraConfig.ProxyTransport.DialContext}
}
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyTransportDial: proxyTransportDial,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
@ -221,7 +231,6 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}
@ -295,10 +304,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.ApiregistrationV1(),
c.ExtraConfig.ProxyTransport,
proxyTransportDial,
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
s.serviceResolver,
c.GenericConfig.EgressSelector,
)
if err != nil {
return nil, err
@ -463,7 +471,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
// Forward calls to discovery manager to update discovery document
if s.discoveryAggregationController != nil {
handlerCopy := *proxyHandler
handlerCopy.setServiceAvailable(true)
handlerCopy.setServiceAvailable()
s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
}
return nil
@ -479,9 +487,8 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
proxyTransport: s.proxyTransport,
proxyTransportDial: s.proxyTransportDial,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
rejectForwardingRedirects: s.rejectForwardingRedirects,
}
proxyHandler.updateAPIService(apiService)

View File

@ -33,10 +33,8 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/x509metrics"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@ -59,17 +57,13 @@ type proxyHandler struct {
// proxyCurrentCertKeyContent holds the client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
proxyCurrentCertKeyContent certKeyFunc
proxyTransport *http.Transport
proxyTransportDial *transport.DialHolder
// Endpoints based routing to map from cluster IP to routable IP
serviceResolver ServiceResolver
handlingInfo atomic.Value
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
// reject to forward redirect response
rejectForwardingRedirects bool
}
@ -80,8 +74,8 @@ type proxyHandlingInfo struct {
// name is the name of the APIService
name string
// restConfig holds the information for building a roundtripper
restConfig *restclient.Config
// transportConfig holds the information for building a roundtripper
transportConfig *transport.Config
// transportBuildingError is an error produced while building the transport. If this
// is non-nil, it will be reported to clients.
transportBuildingError error
@ -233,7 +227,7 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {
// Sets serviceAvailable value on proxyHandler
// not thread safe
func (r *proxyHandler) setServiceAvailable(value bool) {
func (r *proxyHandler) setServiceAvailable() {
info := r.handlingInfo.Load().(proxyHandlingInfo)
info.serviceAvailable = true
r.handlingInfo.Store(info)
@ -247,41 +241,30 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIServ
proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()
clientConfig := &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
transportConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
CertData: proxyClientCert,
KeyData: proxyClientKey,
CAData: apiService.Spec.CABundle,
},
DialHolder: r.proxyTransportDial,
}
clientConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
transportConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
x509MissingSANCounter,
x509InsecureSHA1Counter,
))
newInfo := proxyHandlingInfo{
name: apiService.Name,
restConfig: clientConfig,
transportConfig: transportConfig,
serviceName: apiService.Spec.Service.Name,
serviceNamespace: apiService.Spec.Service.Namespace,
servicePort: *apiService.Spec.Service.Port,
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
}
if r.egressSelector != nil {
networkContext := egressselector.Cluster.AsNetworkContext()
var egressDialer utilnet.DialFunc
egressDialer, err := r.egressSelector.Lookup(networkContext)
if err != nil {
klog.Warning(err.Error())
} else {
newInfo.restConfig.Dial = egressDialer
}
} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
newInfo.restConfig.Dial = r.proxyTransport.DialContext
}
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
newInfo.proxyRoundTripper, newInfo.transportBuildingError = transport.New(newInfo.transportConfig)
if newInfo.transportBuildingError != nil {
klog.Warning(newInfo.transportBuildingError.Error())
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/client-go/transport"
"golang.org/x/net/websocket"
@ -325,7 +326,6 @@ func TestProxyHandler(t *testing.T) {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
serviceResolver: serviceResolver,
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}
server := httptest.NewServer(contextHandler(handler, tc.user))
@ -551,7 +551,6 @@ func TestProxyUpgrade(t *testing.T) {
serverURL, _ := url.Parse(backendServer.URL)
proxyHandler := &proxyHandler{
serviceResolver: &mockedRouter{destinationHost: serverURL.Host},
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}
@ -559,7 +558,14 @@ func TestProxyUpgrade(t *testing.T) {
var selector *egressselector.EgressSelector
if tc.NewEgressSelector != nil {
dialer, selector = tc.NewEgressSelector()
proxyHandler.egressSelector = selector
egressDialer, err := selector.Lookup(egressselector.Cluster.AsNetworkContext())
if err != nil {
t.Fatal(err)
}
if egressDialer != nil {
proxyHandler.proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
}
}
proxyHandler.updateAPIService(tc.APIService)

View File

@ -19,7 +19,6 @@ package apiserver
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"reflect"
@ -33,13 +32,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
v1informers "k8s.io/client-go/informers/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
@ -77,8 +73,8 @@ type AvailableConditionController struct {
endpointsLister v1listers.EndpointsLister
endpointsSynced cache.InformerSynced
// dialContext specifies the dial function for creating unencrypted TCP connections.
dialContext func(ctx context.Context, network, address string) (net.Conn, error)
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
proxyTransportDial *transport.DialHolder
proxyCurrentCertKeyContent certKeyFunc
serviceResolver ServiceResolver
@ -91,67 +87,19 @@ type AvailableConditionController struct {
// this lock protects operations on the above cache
cacheLock sync.RWMutex
// TLS config with customized dialer cannot be cached by the client-go
// tlsTransportCache. Use a local cache here to reduce the chance of
// the controller spamming idle connections with short-lived transports.
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
tlsCache *tlsTransportCache
// metrics registered into legacy registry
metrics *availabilityMetrics
}
type tlsTransportCache struct {
mu sync.Mutex
transports map[tlsCacheKey]http.RoundTripper
}
func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) {
// If the available controller doesn't customzie the dialer (and we know from
// the code that the controller doesn't customzie other functions i.e. Proxy
// and GetCert (ExecProvider)), the config is cacheable by the client-go TLS
// transport cache. Let's skip the local cache and depend on the client-go cache.
if config.Dial == nil {
return rest.TransportFor(config)
}
c.mu.Lock()
defer c.mu.Unlock()
// See if we already have a custom transport for this config
key := tlsConfigKey(config)
if t, ok := c.transports[key]; ok {
return t, nil
}
restTransport, err := rest.TransportFor(config)
if err != nil {
return nil, err
}
c.transports[key] = restTransport
return restTransport, nil
}
type tlsCacheKey struct {
certData string
keyData string `datapolicy:"secret-key"`
}
func tlsConfigKey(c *rest.Config) tlsCacheKey {
return tlsCacheKey{
certData: string(c.TLSClientConfig.CertData),
keyData: string(c.TLSClientConfig.KeyData),
}
}
// NewAvailableConditionController returns a new AvailableConditionController.
func NewAvailableConditionController(
apiServiceInformer informers.APIServiceInformer,
serviceInformer v1informers.ServiceInformer,
endpointsInformer v1informers.EndpointsInformer,
apiServiceClient apiregistrationclient.APIServicesGetter,
proxyTransport *http.Transport,
proxyTransportDial *transport.DialHolder,
proxyCurrentCertKeyContent certKeyFunc,
serviceResolver ServiceResolver,
egressSelector *egressselector.EgressSelector,
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
@ -165,23 +113,11 @@ func NewAvailableConditionController(
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
proxyTransportDial: proxyTransportDial,
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
if egressSelector != nil {
networkContext := egressselector.Cluster.AsNetworkContext()
var egressDialer utilnet.DialFunc
egressDialer, err := egressSelector.Lookup(networkContext)
if err != nil {
return nil, err
}
c.dialContext = egressDialer
} else if proxyTransport != nil && proxyTransport.DialContext != nil {
c.dialContext = proxyTransport.DialContext
}
// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
@ -236,27 +172,20 @@ func (c *AvailableConditionController) sync(key string) error {
// if a particular transport was specified, use that otherwise build one
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
restConfig := &rest.Config{
TLSClientConfig: rest.TLSClientConfig{
transportConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: true,
},
DialHolder: c.proxyTransportDial,
}
if c.proxyCurrentCertKeyContent != nil {
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()
restConfig.TLSClientConfig.CertData = proxyClientCert
restConfig.TLSClientConfig.KeyData = proxyClientKey
transportConfig.TLS.CertData = proxyClientCert
transportConfig.TLS.KeyData = proxyClientKey
}
if c.dialContext != nil {
restConfig.Dial = c.dialContext
}
// TLS config with customized dialer cannot be cached by the client-go
// tlsTransportCache. Use a local cache here to reduce the chance of
// the controller spamming idle connections with short-lived transports.
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
restTransport, err := c.tlsCache.get(restConfig)
restTransport, err := transport.New(transportConfig)
if err != nil {
return err
}

View File

@ -18,7 +18,6 @@ package apiserver
import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
@ -134,8 +133,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
metrics: newAvailabilityMetrics(),
}
for _, svc := range apiServices {
c.addAPIService(svc)
@ -206,54 +204,6 @@ func TestBuildCache(t *testing.T) {
}
}
func TestTLSCache(t *testing.T) {
apiServices := []*apiregistration.APIService{newRemoteAPIService("remote.group")}
services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}
c, _ := setupAPIServices(apiServices)
// TLS configs with customized dialers are uncacheable by the client-go
// TLS transport cache. The local cache will be used.
c.dialContext = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
for _, svc := range services {
c.addService(svc)
}
tests := []struct {
name string
proxyCurrentCertKeyContent certKeyFunc
expectedCacheSize int
}{
{
name: "nil certKeyFunc",
expectedCacheSize: 1,
},
{
name: "empty certKeyFunc",
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
// the tlsCacheKey is the same, reuse existing transport
expectedCacheSize: 1,
},
{
name: "different certKeyFunc",
proxyCurrentCertKeyContent: testCertKeyFunc,
// the tlsCacheKey is different, create a new transport
expectedCacheSize: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
c.proxyCurrentCertKeyContent = tc.proxyCurrentCertKeyContent
for _, apiService := range apiServices {
c.sync(apiService.Name)
}
if len(c.tlsCache.transports) != tc.expectedCacheSize {
t.Fatalf("%v cache size expected %v, got %v", tc.name, tc.expectedCacheSize, len(c.tlsCache.transports))
}
})
}
}
func TestSync(t *testing.T) {
tests := []struct {
name string
@ -445,7 +395,6 @@ func TestSync(t *testing.T) {
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
serviceResolver: &fakeServiceResolver{url: testServer.URL},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
c.sync(tc.apiServiceName)
@ -516,33 +465,3 @@ func TestUpdateAPIServiceStatus(t *testing.T) {
func emptyCert() []byte {
return []byte{}
}
func testCertKeyFunc() ([]byte, []byte) {
return []byte(`-----BEGIN CERTIFICATE-----
MIICBDCCAW2gAwIBAgIJAPgVBh+4xbGoMA0GCSqGSIb3DQEBCwUAMBsxGTAXBgNV
BAMMEHdlYmhvb2tfdGVzdHNfY2EwIBcNMTcwNzI4MjMxNTI4WhgPMjI5MTA1MTMy
MzE1MjhaMB8xHTAbBgNVBAMMFHdlYmhvb2tfdGVzdHNfY2xpZW50MIGfMA0GCSqG
SIb3DQEBAQUAA4GNADCBiQKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0
rmPa674s2pfYo3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGu
uFNhRBvj2S0sIff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4b
a44x/wIDAQABo0owSDAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUEFjAU
BggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0RBAgwBocEfwAAATANBgkqhkiG9w0B
AQsFAAOBgQCpN27uh/LjUVCaBK7Noko25iih/JSSoWzlvc8CaipvSPofNWyGx3Vu
OdcSwNGYX/pp4ZoAzFij/Y5u0vKTVLkWXATeTMVmlPvhmpYjj9gPkCSY6j/SiKlY
kGy0xr+0M5UQkMBcfIh9oAp9um1fZHVWAJAGP/ikZgkcUey0LmBn8w==
-----END CERTIFICATE-----`), []byte(`-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0rmPa674s2pfY
o3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGuuFNhRBvj2S0s
Iff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4ba44x/wIDAQAB
AoGAZbWwowvCq1GBq4vPPRI3h739Uz0bRl1ymf1woYXNguXRtCB4yyH+2BTmmrrF
6AIWkePuUEdbUaKyK5nGu3iOWM+/i6NP3kopQANtbAYJ2ray3kwvFlhqyn1bxX4n
gl/Cbdw1If4zrDrB66y8mYDsjzK7n/gFaDNcY4GArjvOXKkCQQD9Lgv+WD73y4RP
yS+cRarlEeLLWVsX/pg2oEBLM50jsdUnrLSW071MjBgP37oOXzqynF9SoDbP2Y5C
x+aGux9LAkEA5qPlQPv0cv8Wc3qTI+LixZ/86PPHKWnOnwaHm3b9vQjZAkuVQg3n
Wgg9YDmPM87t3UFH7ZbDihUreUxwr9ZjnQJAZ9Z95shMsxbOYmbSVxafu6m1Sc+R
M+sghK7/D5jQpzYlhUspGf8n0YBX0hLhXUmjamQGGH5LXL4Owcb4/mM6twJAEVio
SF/qva9jv+GrKVrKFXT374lOJFY53Qn/rvifEtWUhLCslCA5kzLlctRBafMZPrfH
Mh5RrJP1BhVysDbenQJASGcc+DiF7rB6K++ZGyC11E2AP29DcZ0pgPESSV7npOGg
+NqPRZNVCSZOiVmNuejZqmwKhZNGZnBFx1Y+ChAAgw==
-----END RSA PRIVATE KEY-----`)
}