diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 7cf38ced4c2..51ae2d8a3ca 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -86,6 +86,53 @@ type AvailableConditionController struct { cache map[string]map[string][]string // this lock protects operations on the above cache cacheLock sync.RWMutex + + // TLS config with customized dialer cannot be cached by the client-go + // tlsTransportCache. Use a local cache here to reduce the chance of + // the controller spamming idle connections with short-lived transports. + // NOTE: the cache works because we assume that the transports constructed + // by the controller only vary on the dynamic cert/key. + tlsCache *tlsTransportCache +} + +type tlsTransportCache struct { + mu sync.Mutex + transports map[tlsCacheKey]http.RoundTripper +} + +func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) { + // If the available controller doesn't customzie the dialer (and we know from + // the code that the controller doesn't customzie other functions i.e. Proxy + // and GetCert (ExecProvider)), the config is cacheable by the client-go TLS + // transport cache. Let's skip the local cache and depend on the client-go cache. + if config.Dial == nil { + return rest.TransportFor(config) + } + c.mu.Lock() + defer c.mu.Unlock() + // See if we already have a custom transport for this config + key := tlsConfigKey(config) + if t, ok := c.transports[key]; ok { + return t, nil + } + restTransport, err := rest.TransportFor(config) + if err != nil { + return nil, err + } + c.transports[key] = restTransport + return restTransport, nil +} + +type tlsCacheKey struct { + certData string + keyData string +} + +func tlsConfigKey(c *rest.Config) tlsCacheKey { + return tlsCacheKey{ + certData: string(c.TLSClientConfig.CertData), + keyData: string(c.TLSClientConfig.KeyData), + } } // NewAvailableConditionController returns a new AvailableConditionController. @@ -115,6 +162,7 @@ func NewAvailableConditionController( workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), "AvailableConditionController"), proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, + tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, } if egressSelector != nil { @@ -185,7 +233,12 @@ func (c *AvailableConditionController) sync(key string) error { if c.dialContext != nil { restConfig.Dial = c.dialContext } - restTransport, err := rest.TransportFor(restConfig) + // TLS config with customized dialer cannot be cached by the client-go + // tlsTransportCache. Use a local cache here to reduce the chance of + // the controller spamming idle connections with short-lived transports. + // NOTE: the cache works because we assume that the transports constructed + // by the controller only vary on the dynamic cert/key. + restTransport, err := c.tlsCache.get(restConfig) if err != nil { return err } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index ff389b5bf97..1bb0b403f7f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -18,6 +18,7 @@ package apiserver import ( "fmt" + "net" "net/http" "net/http/httptest" "net/url" @@ -133,6 +134,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond // the maximum disruption time to a minimum, but it does prevent hot loops. workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), "AvailableConditionController"), + tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, } for _, svc := range apiServices { c.addAPIService(svc) @@ -202,6 +204,55 @@ func TestBuildCache(t *testing.T) { }) } } + +func TestTLSCache(t *testing.T) { + apiServices := []*apiregistration.APIService{newRemoteAPIService("remote.group")} + services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)} + c, _ := setupAPIServices(apiServices) + // TLS configs with customized dialers are uncacheable by the client-go + // TLS transport cache. The local cache will be used. + c.dialContext = (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext + for _, svc := range services { + c.addService(svc) + } + tests := []struct { + name string + proxyCurrentCertKeyContent certKeyFunc + expectedCacheSize int + }{ + { + name: "nil certKeyFunc", + expectedCacheSize: 1, + }, + { + name: "empty certKeyFunc", + proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, + // the tlsCacheKey is the same, reuse existing transport + expectedCacheSize: 1, + }, + { + name: "different certKeyFunc", + proxyCurrentCertKeyContent: testCertKeyFunc, + // the tlsCacheKey is different, create a new transport + expectedCacheSize: 2, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c.proxyCurrentCertKeyContent = tc.proxyCurrentCertKeyContent + for _, apiService := range apiServices { + c.sync(apiService.Name) + } + if len(c.tlsCache.transports) != tc.expectedCacheSize { + t.Fatalf("%v cache size expected %v, got %v", tc.name, tc.expectedCacheSize, len(c.tlsCache.transports)) + } + }) + } +} + func TestSync(t *testing.T) { tests := []struct { name string @@ -356,6 +407,7 @@ func TestSync(t *testing.T) { endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer), serviceResolver: &fakeServiceResolver{url: testServer.URL}, proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, + tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, } c.sync(tc.apiServiceName) @@ -420,3 +472,33 @@ func TestUpdateAPIServiceStatus(t *testing.T) { func emptyCert() []byte { return []byte{} } + +func testCertKeyFunc() ([]byte, []byte) { + return []byte(`-----BEGIN CERTIFICATE----- +MIICBDCCAW2gAwIBAgIJAPgVBh+4xbGoMA0GCSqGSIb3DQEBCwUAMBsxGTAXBgNV +BAMMEHdlYmhvb2tfdGVzdHNfY2EwIBcNMTcwNzI4MjMxNTI4WhgPMjI5MTA1MTMy +MzE1MjhaMB8xHTAbBgNVBAMMFHdlYmhvb2tfdGVzdHNfY2xpZW50MIGfMA0GCSqG +SIb3DQEBAQUAA4GNADCBiQKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0 +rmPa674s2pfYo3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGu +uFNhRBvj2S0sIff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4b +a44x/wIDAQABo0owSDAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUEFjAU +BggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0RBAgwBocEfwAAATANBgkqhkiG9w0B +AQsFAAOBgQCpN27uh/LjUVCaBK7Noko25iih/JSSoWzlvc8CaipvSPofNWyGx3Vu +OdcSwNGYX/pp4ZoAzFij/Y5u0vKTVLkWXATeTMVmlPvhmpYjj9gPkCSY6j/SiKlY +kGy0xr+0M5UQkMBcfIh9oAp9um1fZHVWAJAGP/ikZgkcUey0LmBn8w== +-----END CERTIFICATE-----`), []byte(`-----BEGIN RSA PRIVATE KEY----- +MIICWwIBAAKBgQDkGXXSm6Yun5o3Jlmx45rItcQ2pmnoDk4eZfl0rmPa674s2pfY +o3KywkXQ1Fp3BC8GUgzPLSfJ8xXya9Lg1Wo8sHrDln0iRg5HXxGuuFNhRBvj2S0s +Iff0ZG/IatB9I6WXVOUYuQj6+A0CdULNj1vBqH9+7uWbLZ6lrD4ba44x/wIDAQAB +AoGAZbWwowvCq1GBq4vPPRI3h739Uz0bRl1ymf1woYXNguXRtCB4yyH+2BTmmrrF +6AIWkePuUEdbUaKyK5nGu3iOWM+/i6NP3kopQANtbAYJ2ray3kwvFlhqyn1bxX4n +gl/Cbdw1If4zrDrB66y8mYDsjzK7n/gFaDNcY4GArjvOXKkCQQD9Lgv+WD73y4RP +yS+cRarlEeLLWVsX/pg2oEBLM50jsdUnrLSW071MjBgP37oOXzqynF9SoDbP2Y5C +x+aGux9LAkEA5qPlQPv0cv8Wc3qTI+LixZ/86PPHKWnOnwaHm3b9vQjZAkuVQg3n +Wgg9YDmPM87t3UFH7ZbDihUreUxwr9ZjnQJAZ9Z95shMsxbOYmbSVxafu6m1Sc+R +M+sghK7/D5jQpzYlhUspGf8n0YBX0hLhXUmjamQGGH5LXL4Owcb4/mM6twJAEVio +SF/qva9jv+GrKVrKFXT374lOJFY53Qn/rvifEtWUhLCslCA5kzLlctRBafMZPrfH +Mh5RrJP1BhVysDbenQJASGcc+DiF7rB6K++ZGyC11E2AP29DcZ0pgPESSV7npOGg ++NqPRZNVCSZOiVmNuejZqmwKhZNGZnBFx1Y+ChAAgw== +-----END RSA PRIVATE KEY-----`) +}