mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
add a local tlsTransportCache to available_controller
The available_controller creates short-lived clients to sync remote APIService objects. These clients are constructed with HTTP transports that cannot be cached by client-go (because client-go won't know whether the TLS configs have dynamic functions or not), which may spam idle connections. A local cache works because we know all the configs share the same dialer function, and can only vary on the dynamic cert/key.
This commit is contained in:
parent
0765ba8e54
commit
fac48d2538
@ -86,6 +86,53 @@ type AvailableConditionController struct {
|
|||||||
cache map[string]map[string][]string
|
cache map[string]map[string][]string
|
||||||
// this lock protects operations on the above cache
|
// this lock protects operations on the above cache
|
||||||
cacheLock sync.RWMutex
|
cacheLock sync.RWMutex
|
||||||
|
|
||||||
|
// TLS config with customized dialer cannot be cached by the client-go
|
||||||
|
// tlsTransportCache. Use a local cache here to reduce the chance of
|
||||||
|
// the controller spamming idle connections with short-lived transports.
|
||||||
|
// NOTE: the cache works because we assume that the transports constructed
|
||||||
|
// by the controller only vary on the dynamic cert/key.
|
||||||
|
tlsCache *tlsTransportCache
|
||||||
|
}
|
||||||
|
|
||||||
|
type tlsTransportCache struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
transports map[tlsCacheKey]http.RoundTripper
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) {
|
||||||
|
// If the available controller doesn't customzie the dialer (and we know from
|
||||||
|
// the code that the controller doesn't customzie other functions i.e. Proxy
|
||||||
|
// and GetCert (ExecProvider)), the config is cacheable by the client-go TLS
|
||||||
|
// transport cache. Let's skip the local cache and depend on the client-go cache.
|
||||||
|
if config.Dial == nil {
|
||||||
|
return rest.TransportFor(config)
|
||||||
|
}
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
// See if we already have a custom transport for this config
|
||||||
|
key := tlsConfigKey(config)
|
||||||
|
if t, ok := c.transports[key]; ok {
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
restTransport, err := rest.TransportFor(config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.transports[key] = restTransport
|
||||||
|
return restTransport, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type tlsCacheKey struct {
|
||||||
|
certData string
|
||||||
|
keyData string
|
||||||
|
}
|
||||||
|
|
||||||
|
func tlsConfigKey(c *rest.Config) tlsCacheKey {
|
||||||
|
return tlsCacheKey{
|
||||||
|
certData: string(c.TLSClientConfig.CertData),
|
||||||
|
keyData: string(c.TLSClientConfig.KeyData),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAvailableConditionController returns a new AvailableConditionController.
|
// NewAvailableConditionController returns a new AvailableConditionController.
|
||||||
@ -115,6 +162,7 @@ func NewAvailableConditionController(
|
|||||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
||||||
"AvailableConditionController"),
|
"AvailableConditionController"),
|
||||||
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
|
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
|
||||||
|
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
|
||||||
}
|
}
|
||||||
|
|
||||||
if egressSelector != nil {
|
if egressSelector != nil {
|
||||||
@ -185,7 +233,12 @@ func (c *AvailableConditionController) sync(key string) error {
|
|||||||
if c.dialContext != nil {
|
if c.dialContext != nil {
|
||||||
restConfig.Dial = c.dialContext
|
restConfig.Dial = c.dialContext
|
||||||
}
|
}
|
||||||
restTransport, err := rest.TransportFor(restConfig)
|
// TLS config with customized dialer cannot be cached by the client-go
|
||||||
|
// tlsTransportCache. Use a local cache here to reduce the chance of
|
||||||
|
// the controller spamming idle connections with short-lived transports.
|
||||||
|
// NOTE: the cache works because we assume that the transports constructed
|
||||||
|
// by the controller only vary on the dynamic cert/key.
|
||||||
|
restTransport, err := c.tlsCache.get(restConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package apiserver
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -133,6 +134,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
|
|||||||
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
||||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
||||||
"AvailableConditionController"),
|
"AvailableConditionController"),
|
||||||
|
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
|
||||||
}
|
}
|
||||||
for _, svc := range apiServices {
|
for _, svc := range apiServices {
|
||||||
c.addAPIService(svc)
|
c.addAPIService(svc)
|
||||||
@ -202,6 +204,55 @@ func TestBuildCache(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTLSCache(t *testing.T) {
|
||||||
|
apiServices := []*apiregistration.APIService{newRemoteAPIService("remote.group")}
|
||||||
|
services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}
|
||||||
|
c, _ := setupAPIServices(apiServices)
|
||||||
|
// TLS configs with customized dialers are uncacheable by the client-go
|
||||||
|
// TLS transport cache. The local cache will be used.
|
||||||
|
c.dialContext = (&net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext
|
||||||
|
for _, svc := range services {
|
||||||
|
c.addService(svc)
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
proxyCurrentCertKeyContent certKeyFunc
|
||||||
|
expectedCacheSize int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "nil certKeyFunc",
|
||||||
|
expectedCacheSize: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty certKeyFunc",
|
||||||
|
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
||||||
|
// the tlsCacheKey is the same, reuse existing transport
|
||||||
|
expectedCacheSize: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "different certKeyFunc",
|
||||||
|
proxyCurrentCertKeyContent: testCertKeyFunc,
|
||||||
|
// the tlsCacheKey is different, create a new transport
|
||||||
|
expectedCacheSize: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
c.proxyCurrentCertKeyContent = tc.proxyCurrentCertKeyContent
|
||||||
|
for _, apiService := range apiServices {
|
||||||
|
c.sync(apiService.Name)
|
||||||
|
}
|
||||||
|
if len(c.tlsCache.transports) != tc.expectedCacheSize {
|
||||||
|
t.Fatalf("%v cache size expected %v, got %v", tc.name, tc.expectedCacheSize, len(c.tlsCache.transports))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSync(t *testing.T) {
|
func TestSync(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -356,6 +407,7 @@ func TestSync(t *testing.T) {
|
|||||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||||
serviceResolver: &fakeServiceResolver{url: testServer.URL},
|
serviceResolver: &fakeServiceResolver{url: testServer.URL},
|
||||||
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
|
||||||
|
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
|
||||||
}
|
}
|
||||||
c.sync(tc.apiServiceName)
|
c.sync(tc.apiServiceName)
|
||||||
|
|
||||||
@ -420,3 +472,33 @@ func TestUpdateAPIServiceStatus(t *testing.T) {
|
|||||||
func emptyCert() []byte {
|
func emptyCert() []byte {
|
||||||
return []byte{}
|
return []byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCertKeyFunc() ([]byte, []byte) {
|
||||||
|
return []byte(`-----BEGIN CERTIFICATE-----
|
||||||
|
MIICBDCCAW2gAwIBAgIJAPgVBh+4xbGoMA0GCSqGSIb3DQEBCwUAMBsxGTAXBgNV
|
||||||
|
BAMMEHdlYmhvb2tfdGVzdHNfY2EwIBcNMTcwNzI4MjMxNTI4WhgPMjI5MTA1MTMy
|
||||||
|
MzE1MjhaMB8xHTAbBgNVBAMMFHdlYmhvb2tfdGVzdHNfY2xpZW50MIGfMA0GCSqG
|
||||||
|
SIb3DQEBAQUAA4GNADCBiQKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0
|
||||||
|
rmPa674s2pfYo3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGu
|
||||||
|
uFNhRBvj2S0sIff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4b
|
||||||
|
a44x/wIDAQABo0owSDAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUEFjAU
|
||||||
|
BggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0RBAgwBocEfwAAATANBgkqhkiG9w0B
|
||||||
|
AQsFAAOBgQCpN27uh/LjUVCaBK7Noko25iih/JSSoWzlvc8CaipvSPofNWyGx3Vu
|
||||||
|
OdcSwNGYX/pp4ZoAzFij/Y5u0vKTVLkWXATeTMVmlPvhmpYjj9gPkCSY6j/SiKlY
|
||||||
|
kGy0xr+0M5UQkMBcfIh9oAp9um1fZHVWAJAGP/ikZgkcUey0LmBn8w==
|
||||||
|
-----END CERTIFICATE-----`), []byte(`-----BEGIN RSA PRIVATE KEY-----
|
||||||
|
MIICWwIBAAKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0rmPa674s2pfY
|
||||||
|
o3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGuuFNhRBvj2S0s
|
||||||
|
Iff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4ba44x/wIDAQAB
|
||||||
|
AoGAZbWwowvCq1GBq4vPPRI3h739Uz0bRl1ymf1woYXNguXRtCB4yyH+2BTmmrrF
|
||||||
|
6AIWkePuUEdbUaKyK5nGu3iOWM+/i6NP3kopQANtbAYJ2ray3kwvFlhqyn1bxX4n
|
||||||
|
gl/Cbdw1If4zrDrB66y8mYDsjzK7n/gFaDNcY4GArjvOXKkCQQD9Lgv+WD73y4RP
|
||||||
|
yS+cRarlEeLLWVsX/pg2oEBLM50jsdUnrLSW071MjBgP37oOXzqynF9SoDbP2Y5C
|
||||||
|
x+aGux9LAkEA5qPlQPv0cv8Wc3qTI+LixZ/86PPHKWnOnwaHm3b9vQjZAkuVQg3n
|
||||||
|
Wgg9YDmPM87t3UFH7ZbDihUreUxwr9ZjnQJAZ9Z95shMsxbOYmbSVxafu6m1Sc+R
|
||||||
|
M+sghK7/D5jQpzYlhUspGf8n0YBX0hLhXUmjamQGGH5LXL4Owcb4/mM6twJAEVio
|
||||||
|
SF/qva9jv+GrKVrKFXT374lOJFY53Qn/rvifEtWUhLCslCA5kzLlctRBafMZPrfH
|
||||||
|
Mh5RrJP1BhVysDbenQJASGcc+DiF7rB6K++ZGyC11E2AP29DcZ0pgPESSV7npOGg
|
||||||
|
+NqPRZNVCSZOiVmNuejZqmwKhZNGZnBFx1Y+ChAAgw==
|
||||||
|
-----END RSA PRIVATE KEY-----`)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user