diff --git a/staging/src/k8s.io/client-go/transport/cache.go b/staging/src/k8s.io/client-go/transport/cache.go index 3ec4e19357d..fa2afb1f161 100644 --- a/staging/src/k8s.io/client-go/transport/cache.go +++ b/staging/src/k8s.io/client-go/transport/cache.go @@ -47,12 +47,9 @@ type tlsCacheKey struct { keyData string certFile string keyFile string - getCert string serverName string nextProtos string - dial string disableCompression bool - proxy string } func (t tlsCacheKey) String() string { @@ -60,22 +57,24 @@ func (t tlsCacheKey) String() string { if len(t.keyData) > 0 { keyText = "" } - return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, getCert: %s, serverName:%s, dial:%s disableCompression:%t, proxy: %s", t.insecure, t.caData, t.certData, keyText, t.getCert, t.serverName, t.dial, t.disableCompression, t.proxy) + return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s, disableCompression:%t", t.insecure, t.caData, t.certData, keyText, t.serverName, t.disableCompression) } func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) { - key, err := tlsConfigKey(config) + key, canCache, err := tlsConfigKey(config) if err != nil { return nil, err } - // Ensure we only create a single transport for the given TLS options - c.mu.Lock() - defer c.mu.Unlock() + if canCache { + // Ensure we only create a single transport for the given TLS options + c.mu.Lock() + defer c.mu.Unlock() - // See if we already have a custom transport for this config - if t, ok := c.transports[key]; ok { - return t, nil + // See if we already have a custom transport for this config + if t, ok := c.transports[key]; ok { + return t, nil + } } // Get the TLS options for this client config @@ -110,8 +109,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) { proxy = config.Proxy } - // Cache a single transport for these options - c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{ + transport := utilnet.SetTransportDefaults(&http.Transport{ Proxy: proxy, TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: tlsConfig, @@ -119,24 +117,33 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) { DialContext: dial, DisableCompression: config.DisableCompression, }) - return c.transports[key], nil + + if canCache { + // Cache a single transport for these options + c.transports[key] = transport + } + + return transport, nil } // tlsConfigKey returns a unique key for tls.Config objects returned from TLSConfigFor -func tlsConfigKey(c *Config) (tlsCacheKey, error) { +func tlsConfigKey(c *Config) (tlsCacheKey, bool, error) { // Make sure ca/key/cert content is loaded if err := loadTLSFiles(c); err != nil { - return tlsCacheKey{}, err + return tlsCacheKey{}, false, err } + + if c.TLS.GetCert != nil || c.Dial != nil || c.Proxy != nil { + // cannot determine equality for functions + return tlsCacheKey{}, false, nil + } + k := tlsCacheKey{ insecure: c.TLS.Insecure, caData: string(c.TLS.CAData), - getCert: fmt.Sprintf("%p", c.TLS.GetCert), serverName: c.TLS.ServerName, nextProtos: strings.Join(c.TLS.NextProtos, ","), - dial: fmt.Sprintf("%p", c.Dial), disableCompression: c.DisableCompression, - proxy: fmt.Sprintf("%p", c.Proxy), } if c.TLS.ReloadTLSFiles { @@ -147,5 +154,5 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) { k.keyData = string(c.TLS.KeyData) } - return k, nil + return k, true, nil } diff --git a/staging/src/k8s.io/client-go/transport/cache_test.go b/staging/src/k8s.io/client-go/transport/cache_test.go index 11ee6253575..c6d06fcab37 100644 --- a/staging/src/k8s.io/client-go/transport/cache_test.go +++ b/staging/src/k8s.io/client-go/transport/cache_test.go @@ -21,7 +21,6 @@ import ( "crypto/tls" "net" "net/http" - "net/url" "testing" ) @@ -37,16 +36,24 @@ func TestTLSConfigKey(t *testing.T) { } for nameA, valueA := range identicalConfigurations { for nameB, valueB := range identicalConfigurations { - keyA, err := tlsConfigKey(valueA) + keyA, canCache, err := tlsConfigKey(valueA) if err != nil { t.Errorf("Unexpected error for %q: %v", nameA, err) continue } - keyB, err := tlsConfigKey(valueB) + if !canCache { + t.Errorf("Unexpected canCache=false") + continue + } + keyB, canCache, err := tlsConfigKey(valueB) if err != nil { t.Errorf("Unexpected error for %q: %v", nameB, err) continue } + if !canCache { + t.Errorf("Unexpected canCache=false") + continue + } if keyA != keyB { t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB) continue @@ -132,12 +139,12 @@ func TestTLSConfigKey(t *testing.T) { } for nameA, valueA := range uniqueConfigurations { for nameB, valueB := range uniqueConfigurations { - keyA, err := tlsConfigKey(valueA) + keyA, canCacheA, err := tlsConfigKey(valueA) if err != nil { t.Errorf("Unexpected error for %q: %v", nameA, err) continue } - keyB, err := tlsConfigKey(valueB) + keyB, canCacheB, err := tlsConfigKey(valueB) if err != nil { t.Errorf("Unexpected error for %q: %v", nameB, err) continue @@ -148,33 +155,18 @@ func TestTLSConfigKey(t *testing.T) { if keyA != keyB { t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB) } + if canCacheA != canCacheB { + t.Errorf("Expected identical canCache %q and %q, got:\n\t%v\n\t%v", nameA, nameB, canCacheA, canCacheB) + } continue } - if keyA == keyB { - t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB) - continue + if canCacheA && canCacheB { + if keyA == keyB { + t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB) + continue + } } } } } - -func TestTLSConfigKeyFuncPtr(t *testing.T) { - keys := make(map[tlsCacheKey]struct{}) - makeKey := func(p func(*http.Request) (*url.URL, error)) tlsCacheKey { - key, err := tlsConfigKey(&Config{Proxy: p}) - if err != nil { - t.Fatalf("Unexpected error creating cache key: %v", err) - } - return key - } - - keys[makeKey(http.ProxyFromEnvironment)] = struct{}{} - keys[makeKey(http.ProxyFromEnvironment)] = struct{}{} - keys[makeKey(http.ProxyURL(nil))] = struct{}{} - keys[makeKey(nil)] = struct{}{} - - if got, want := len(keys), 3; got != want { - t.Fatalf("Unexpected number of keys: got=%d want=%d", got, want) - } -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 7cf38ced4c2..51ae2d8a3ca 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -86,6 +86,53 @@ type AvailableConditionController struct { cache map[string]map[string][]string // this lock protects operations on the above cache cacheLock sync.RWMutex + + // TLS config with customized dialer cannot be cached by the client-go + // tlsTransportCache. Use a local cache here to reduce the chance of + // the controller spamming idle connections with short-lived transports. + // NOTE: the cache works because we assume that the transports constructed + // by the controller only vary on the dynamic cert/key. + tlsCache *tlsTransportCache +} + +type tlsTransportCache struct { + mu sync.Mutex + transports map[tlsCacheKey]http.RoundTripper +} + +func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) { + // If the available controller doesn't customzie the dialer (and we know from + // the code that the controller doesn't customzie other functions i.e. Proxy + // and GetCert (ExecProvider)), the config is cacheable by the client-go TLS + // transport cache. Let's skip the local cache and depend on the client-go cache. + if config.Dial == nil { + return rest.TransportFor(config) + } + c.mu.Lock() + defer c.mu.Unlock() + // See if we already have a custom transport for this config + key := tlsConfigKey(config) + if t, ok := c.transports[key]; ok { + return t, nil + } + restTransport, err := rest.TransportFor(config) + if err != nil { + return nil, err + } + c.transports[key] = restTransport + return restTransport, nil +} + +type tlsCacheKey struct { + certData string + keyData string +} + +func tlsConfigKey(c *rest.Config) tlsCacheKey { + return tlsCacheKey{ + certData: string(c.TLSClientConfig.CertData), + keyData: string(c.TLSClientConfig.KeyData), + } } // NewAvailableConditionController returns a new AvailableConditionController. @@ -115,6 +162,7 @@ func NewAvailableConditionController( workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), "AvailableConditionController"), proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, + tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, } if egressSelector != nil { @@ -185,7 +233,12 @@ func (c *AvailableConditionController) sync(key string) error { if c.dialContext != nil { restConfig.Dial = c.dialContext } - restTransport, err := rest.TransportFor(restConfig) + // TLS config with customized dialer cannot be cached by the client-go + // tlsTransportCache. Use a local cache here to reduce the chance of + // the controller spamming idle connections with short-lived transports. + // NOTE: the cache works because we assume that the transports constructed + // by the controller only vary on the dynamic cert/key. + restTransport, err := c.tlsCache.get(restConfig) if err != nil { return err } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index ff389b5bf97..1bb0b403f7f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -18,6 +18,7 @@ package apiserver import ( "fmt" + "net" "net/http" "net/http/httptest" "net/url" @@ -133,6 +134,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond // the maximum disruption time to a minimum, but it does prevent hot loops. workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), "AvailableConditionController"), + tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, } for _, svc := range apiServices { c.addAPIService(svc) @@ -202,6 +204,55 @@ func TestBuildCache(t *testing.T) { }) } } + +func TestTLSCache(t *testing.T) { + apiServices := []*apiregistration.APIService{newRemoteAPIService("remote.group")} + services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)} + c, _ := setupAPIServices(apiServices) + // TLS configs with customized dialers are uncacheable by the client-go + // TLS transport cache. The local cache will be used. + c.dialContext = (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext + for _, svc := range services { + c.addService(svc) + } + tests := []struct { + name string + proxyCurrentCertKeyContent certKeyFunc + expectedCacheSize int + }{ + { + name: "nil certKeyFunc", + expectedCacheSize: 1, + }, + { + name: "empty certKeyFunc", + proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, + // the tlsCacheKey is the same, reuse existing transport + expectedCacheSize: 1, + }, + { + name: "different certKeyFunc", + proxyCurrentCertKeyContent: testCertKeyFunc, + // the tlsCacheKey is different, create a new transport + expectedCacheSize: 2, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c.proxyCurrentCertKeyContent = tc.proxyCurrentCertKeyContent + for _, apiService := range apiServices { + c.sync(apiService.Name) + } + if len(c.tlsCache.transports) != tc.expectedCacheSize { + t.Fatalf("%v cache size expected %v, got %v", tc.name, tc.expectedCacheSize, len(c.tlsCache.transports)) + } + }) + } +} + func TestSync(t *testing.T) { tests := []struct { name string @@ -356,6 +407,7 @@ func TestSync(t *testing.T) { endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer), serviceResolver: &fakeServiceResolver{url: testServer.URL}, proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, + tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, } c.sync(tc.apiServiceName) @@ -420,3 +472,33 @@ func TestUpdateAPIServiceStatus(t *testing.T) { func emptyCert() []byte { return []byte{} } + +func testCertKeyFunc() ([]byte, []byte) { + return []byte(`-----BEGIN CERTIFICATE----- +MIICBDCCAW2gAwIBAgIJAPgVBh+4xbGoMA0GCSqGSIb3DQEBCwUAMBsxGTAXBgNV +BAMMEHdlYmhvb2tfdGVzdHNfY2EwIBcNMTcwNzI4MjMxNTI4WhgPMjI5MTA1MTMy +MzE1MjhaMB8xHTAbBgNVBAMMFHdlYmhvb2tfdGVzdHNfY2xpZW50MIGfMA0GCSqG +SIb3DQEBAQUAA4GNADCBiQKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0 +rmPa674s2pfYo3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGu +uFNhRBvj2S0sIff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4b +a44x/wIDAQABo0owSDAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUEFjAU +BggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0RBAgwBocEfwAAATANBgkqhkiG9w0B +AQsFAAOBgQCpN27uh/LjUVCaBK7Noko25iih/JSSoWzlvc8CaipvSPofNWyGx3Vu +OdcSwNGYX/pp4ZoAzFij/Y5u0vKTVLkWXATeTMVmlPvhmpYjj9gPkCSY6j/SiKlY +kGy0xr+0M5UQkMBcfIh9oAp9um1fZHVWAJAGP/ikZgkcUey0LmBn8w== +-----END CERTIFICATE-----`), []byte(`-----BEGIN RSA PRIVATE KEY----- +MIICWwIBAAKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0rmPa674s2pfY +o3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGuuFNhRBvj2S0s +Iff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4ba44x/wIDAQAB +AoGAZbWwowvCq1GBq4vPPRI3h739Uz0bRl1ymf1woYXNguXRtCB4yyH+2BTmmrrF +6AIWkePuUEdbUaKyK5nGu3iOWM+/i6NP3kopQANtbAYJ2ray3kwvFlhqyn1bxX4n +gl/Cbdw1If4zrDrB66y8mYDsjzK7n/gFaDNcY4GArjvOXKkCQQD9Lgv+WD73y4RP +yS+cRarlEeLLWVsX/pg2oEBLM50jsdUnrLSW071MjBgP37oOXzqynF9SoDbP2Y5C +x+aGux9LAkEA5qPlQPv0cv8Wc3qTI+LixZ/86PPHKWnOnwaHm3b9vQjZAkuVQg3n +Wgg9YDmPM87t3UFH7ZbDihUreUxwr9ZjnQJAZ9Z95shMsxbOYmbSVxafu6m1Sc+R +M+sghK7/D5jQpzYlhUspGf8n0YBX0hLhXUmjamQGGH5LXL4Owcb4/mM6twJAEVio +SF/qva9jv+GrKVrKFXT374lOJFY53Qn/rvifEtWUhLCslCA5kzLlctRBafMZPrfH +Mh5RrJP1BhVysDbenQJASGcc+DiF7rB6K++ZGyC11E2AP29DcZ0pgPESSV7npOGg ++NqPRZNVCSZOiVmNuejZqmwKhZNGZnBFx1Y+ChAAgw== +-----END RSA PRIVATE KEY-----`) +}