Compare commits

..

17 Commits

Author SHA1 Message Date
Kubernetes Publisher
150c3c96ff Update dependencies to v0.22.15 tag 2022-09-21 22:54:23 +00:00
Kubernetes Publisher
02baeca50e Merge pull request #112339 from enj/automated-cherry-pick-of-#112017-upstream-release-1.22
Automated cherry pick of #112017: exec auth: support TLS config caching

Kubernetes-commit: c400c8bb1229d739ec6119a8b79a1f0852ab5905
2022-09-09 00:25:24 -07:00
Monis Khan
2d6ac8e675 exec auth: support TLS config caching
This change updates the transport.Config .Dial and .TLS.GetCert fields
to use a struct wrapper.  This indirection via a pointer allows the
functions to be compared and thus makes them valid to use as map keys.
This change is then leveraged by the existing global exec auth and TLS
config caches to return the same authenticator and TLS config even when
distinct but identical rest configs were used to create distinct
clientsets.

Signed-off-by: Monis Khan <mok@microsoft.com>

Kubernetes-commit: b1b8578ae937b77e64a1ce75c23f0dacb6d45064
2022-08-24 16:04:19 +00:00
Monis Khan
6f28fe1a58 client-go exec: make sure round tripper can be unwrapped
Signed-off-by: Monis Khan <mok@vmware.com>

Kubernetes-commit: 02b0b8d4c8daa2030672de6d6dbc86b009f5db32
2021-10-29 17:59:52 -04:00
Kubernetes Publisher
48190f8df6 Merge pull request #111272 from Abirdcfly/automated-cherry-pick-of-#111235-upstream-release-1.22
Automated cherry pick of #111235: fix a possible panic because of taking the address of nil

Kubernetes-commit: cdfdbd03d6b159e8a765b7872363dce068dd785c
2022-08-11 14:41:06 +00:00
Abirdcfly
595613289c fix a possible panic because of taking the address of nil
Signed-off-by: Abirdcfly <fp544037857@gmail.com>

Kubernetes-commit: b4431bcac3530dab757d345c2a177fb57015e7e8
2022-07-19 10:39:08 +08:00
Kubernetes Publisher
b626b5b2e9 sync: initially remove files BUILD */BUILD BUILD.bazel */BUILD.bazel Gopkg.toml */.gitattributes 2022-03-25 17:21:24 +00:00
Kubernetes Publisher
be23bb76ca Merge pull request #107637 from gjkim42/cherry-pick-of-#106473-upstream-release-1.22
Update k/utils to v0.0.0-20211116205334-6203023598ed

Kubernetes-commit: b2604798aaa65dcecf94a67ede54c2cf0c3c3e63
2022-01-24 17:29:54 +00:00
Gunju Kim
561c55c5a6 Update k/utils to v0.0.0-20211116205334-6203023598ed
Kubernetes-commit: 723c1b334ecf48c43fb9ddf6abf4b45406f1110b
2022-01-19 20:47:53 +09:00
Kubernetes Publisher
74d3410324 Merge pull request #107568 from jiahuif-forks/automated-cherry-pick-of-#107565-upstream-release-1.22
Automated cherry pick of #107565: upgrade sigs.k8s.io/structured-merge-diff/v4 to v4.2.1

Kubernetes-commit: 487eef699dc8ad07106876e7fb045962606035ec
2022-01-15 13:27:08 +00:00
Kubernetes Publisher
558b220907 Merge pull request #106582 from hzxuzhonghu/automated-cherry-pick-of-#104991-#105031-origin-release-1.22
Automated cherry pick of #104991: Fix workqueue memory leak
#105031: workqueue: fix leak in queue preventing objects from being

Kubernetes-commit: dcb59ac5e2ae03a5284398248fe5162d5814ef67
2022-01-15 05:48:19 +00:00
Jiahui Feng
920722c69c generated: ./hack/update-vendor.sh
Kubernetes-commit: e0550f5f956b7fac6033bb58c678f733d972c97d
2022-01-14 10:31:44 -08:00
Jiahui Feng
a33ed41d67 upgrade sigs.k8s.io/structured-merge-diff/v4 to v4.2.1
Kubernetes-commit: fa2db4831f720fcc31139a026cb012130058bca3
2022-01-14 10:30:23 -08:00
Kubernetes Publisher
81c3757ed9 Merge pull request #107335 from fasaxc/automated-cherry-pick-of-#107311-upstream-release-1.22
Automated cherry pick of #107311: client-go: Clear the ResourceVersionMatch on paged list calls

Kubernetes-commit: 7ae06380be3782c228a4f6ca44d5af5975f286c5
2022-01-05 17:38:27 -08:00
Shaun Crampton
584bf4e4e8 client-go: Clear the ResourceVersionMatch on paged list calls
API server rejects continuations with ResourceVersionMatch set.

Kubernetes-commit: a1ab5cbbf147aab16d087adb5b2ff58239fa67d1
2022-01-04 16:05:32 +00:00
John Howard
973eb806bc workqueue: fix leak in queue preventing objects from being GCed
See https://github.com/grpc/grpc-go/issues/4758 for a real world example
of this leaking 2gb+ of data.

Basically, when we do `q.queue[1:]` we are just repositioning the slice.
The underlying array is still active, which contains the object formerly
known as `q.queue[0]`. Because its referencing this object, it will not
be GCed. The only thing that will trigger it to free is eventually when
we add enough to the queue that we allocate a whole new array.

Instead, we should explicitly clear out the old space when we remove it
from the queue. This ensures the object can be GCed, assuming the users'
application doesn't reference it anymore.

Kubernetes-commit: fe4e0cb3f518144b8892414992350783f2f20c07
2021-09-14 15:51:09 -07:00
xuzhonghu
7f47bbe3fe Fix workqueue memory leak
Kubernetes-commit: ccc8303df1970632e9ce8576f24ca5b20115f5fa
2021-09-14 09:53:53 +08:00
15 changed files with 435 additions and 33 deletions

12
go.mod
View File

@@ -30,15 +30,15 @@ require (
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
google.golang.org/protobuf v1.26.0
k8s.io/api v0.22.5
k8s.io/apimachinery v0.22.5
k8s.io/api v0.22.15
k8s.io/apimachinery v0.22.15
k8s.io/klog/v2 v2.9.0
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a
sigs.k8s.io/structured-merge-diff/v4 v4.1.2
k8s.io/utils v0.0.0-20211116205334-6203023598ed
sigs.k8s.io/structured-merge-diff/v4 v4.2.1
sigs.k8s.io/yaml v1.2.0
)
replace (
k8s.io/api => k8s.io/api v0.22.5
k8s.io/apimachinery => k8s.io/apimachinery v0.22.5
k8s.io/api => k8s.io/api v0.22.15
k8s.io/apimachinery => k8s.io/apimachinery v0.22.15
)

16
go.sum
View File

@@ -444,23 +444,23 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.22.5 h1:xk7C+rMjF/EGELiD560jdmwzrB788mfcHiNbMQLIVI8=
k8s.io/api v0.22.5/go.mod h1:mEhXyLaSD1qTOf40rRiKXkc+2iCem09rWLlFwhCEiAs=
k8s.io/apimachinery v0.22.5 h1:cIPwldOYm1Slq9VLBRPtEYpyhjIm1C6aAMAoENuvN9s=
k8s.io/apimachinery v0.22.5/go.mod h1:xziclGKwuuJ2RM5/rSFQSYAj0zdbci3DH8kj+WvyN0U=
k8s.io/api v0.22.15 h1:ersJ+vN3OD+3xutZ+T/VT8hHKpjfzFOftw05li3S2VI=
k8s.io/api v0.22.15/go.mod h1:sJ9GkTzy/ni7FnB8Rf9o1TAHrlrUIiqD+bW/nlrjjHk=
k8s.io/apimachinery v0.22.15 h1:Ad8XfYmIwYVHznV2iNatXujRVTEOs0h2RCYm81Ql5EM=
k8s.io/apimachinery v0.22.15/go.mod h1:ZvVLP5iLhwVFg2Yx9Gh5W0um0DUauExbRhe+2Z8I1EU=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.9.0 h1:D7HV+n1V57XeZ0m6tdRkfknthUaM06VFbWldOFh8kzM=
k8s.io/klog/v2 v2.9.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c h1:jvamsI1tn9V0S8jicyX82qaFC0H/NKxv2e5mbqsgR80=
k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw=
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a h1:8dYfu/Fc9Gz2rNJKB9IQRGgQOh2clmRzNIPPY1xLY5g=
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20211116205334-6203023598ed h1:ck1fRPWPJWsMd8ZRFsWc6mh/zHp5fZ/shhbrgPUxDAE=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 h1:Hr/htKFmJEbtMgS/UD0N+gtgctAqz81t3nu+sPzynno=
sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=

View File

@@ -1 +0,0 @@
base.go export-subst

View File

@@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/pkg/apis/clientauthentication"
"k8s.io/client-go/pkg/apis/clientauthentication/install"
clientauthenticationv1 "k8s.io/client-go/pkg/apis/clientauthentication/v1"
@@ -200,14 +201,18 @@ func newAuthenticator(c *cache, isTerminalFunc func(int) bool, config *api.ExecC
now: time.Now,
environ: os.Environ,
defaultDialer: defaultDialer,
connTracker: connTracker,
connTracker: connTracker,
}
for _, env := range config.Env {
a.env = append(a.env, env.Name+"="+env.Value)
}
// these functions are made comparable and stored in the cache so that repeated clientset
// construction with the same rest.Config results in a single TLS cache and Authenticator
a.getCert = &transport.GetCertHolder{GetCert: a.cert}
a.dial = &transport.DialHolder{Dial: defaultDialer.DialContext}
return c.put(key, a), nil
}
@@ -262,8 +267,6 @@ type Authenticator struct {
now func() time.Time
environ func() []string
// defaultDialer is used for clients which don't specify a custom dialer
defaultDialer *connrotation.Dialer
// connTracker tracks all connections opened that we need to close when rotating a client certificate
connTracker *connrotation.ConnectionTracker
@@ -274,6 +277,12 @@ type Authenticator struct {
mu sync.Mutex
cachedCreds *credentials
exp time.Time
// getCert makes Authenticator.cert comparable to support TLS config caching
getCert *transport.GetCertHolder
// dial is used for clients which do not specify a custom dialer
// it is comparable to support TLS config caching
dial *transport.DialHolder
}
type credentials struct {
@@ -301,26 +310,34 @@ func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
if c.TLS.GetCert != nil {
return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
}
c.TLS.GetCert = a.cert
c.TLS.GetCert = a.getCert.GetCert
c.TLS.GetCertHolder = a.getCert // comparable for TLS config caching
var d *connrotation.Dialer
if c.Dial != nil {
// if c has a custom dialer, we have to wrap it
d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
// TLS config caching is not supported for this config
d := connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
c.Dial = d.DialContext
c.DialHolder = nil
} else {
d = a.defaultDialer
c.Dial = a.dial.Dial
c.DialHolder = a.dial // comparable for TLS config caching
}
c.Dial = d.DialContext
return nil
}
var _ utilnet.RoundTripperWrapper = &roundTripper{}
type roundTripper struct {
a *Authenticator
base http.RoundTripper
}
func (r *roundTripper) WrappedRoundTripper() http.RoundTripper {
return r.base
}
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// If a user has already set credentials, use that. This makes commands like
// "kubectl get --token (token) pods" work.

View File

@@ -0,0 +1,106 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec_test // separate package to prevent circular import
import (
"context"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
// TestExecTLSCache asserts the semantics of the TLS cache when exec auth is used.
//
// In particular, when:
// - multiple identical rest configs exist as distinct objects, and
// - these rest configs use exec auth, and
// - these rest configs are used to create distinct clientsets, then
//
// the underlying TLS config is shared between those clientsets.
func TestExecTLSCache(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
config1 := &rest.Config{
Host: "https://localhost",
ExecProvider: &clientcmdapi.ExecConfig{
Command: "./testdata/test-plugin.sh",
APIVersion: "client.authentication.k8s.io/v1",
InteractiveMode: clientcmdapi.IfAvailableExecInteractiveMode,
},
}
client1 := clientset.NewForConfigOrDie(config1)
config2 := &rest.Config{
Host: "https://localhost",
ExecProvider: &clientcmdapi.ExecConfig{
Command: "./testdata/test-plugin.sh",
APIVersion: "client.authentication.k8s.io/v1",
InteractiveMode: clientcmdapi.IfAvailableExecInteractiveMode,
},
}
client2 := clientset.NewForConfigOrDie(config2)
config3 := &rest.Config{
Host: "https://localhost",
ExecProvider: &clientcmdapi.ExecConfig{
Command: "./testdata/test-plugin.sh",
Args: []string{"make this exec auth different"},
APIVersion: "client.authentication.k8s.io/v1",
InteractiveMode: clientcmdapi.IfAvailableExecInteractiveMode,
},
}
client3 := clientset.NewForConfigOrDie(config3)
_, _ = client1.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
_, _ = client2.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
_, _ = client3.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
rt1 := client1.RESTClient().(*rest.RESTClient).Client.Transport
rt2 := client2.RESTClient().(*rest.RESTClient).Client.Transport
rt3 := client3.RESTClient().(*rest.RESTClient).Client.Transport
tlsConfig1, err := utilnet.TLSClientConfig(rt1)
if err != nil {
t.Fatal(err)
}
tlsConfig2, err := utilnet.TLSClientConfig(rt2)
if err != nil {
t.Fatal(err)
}
tlsConfig3, err := utilnet.TLSClientConfig(rt3)
if err != nil {
t.Fatal(err)
}
if tlsConfig1 == nil || tlsConfig2 == nil || tlsConfig3 == nil {
t.Fatal("expected non-nil TLS configs")
}
if tlsConfig1 != tlsConfig2 {
t.Fatal("expected the same TLS config for matching exec config via rest config")
}
if tlsConfig1 == tlsConfig3 {
t.Fatal("expected different TLS config for non-matching exec config via rest config")
}
}

View File

@@ -51,10 +51,10 @@ func (a *PromptingAuthLoader) LoadAuth(path string) (*clientauth.Info, error) {
// Prompt for user/pass and write a file if none exists.
if _, err := os.Stat(path); os.IsNotExist(err) {
authPtr, err := a.Prompt()
auth := *authPtr
if err != nil {
return nil, err
}
auth := *authPtr
data, err := json.Marshal(auth)
if err != nil {
return &auth, err

View File

@@ -78,6 +78,7 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
options.Limit = p.PageSize
}
requestedResourceVersion := options.ResourceVersion
requestedResourceVersionMatch := options.ResourceVersionMatch
var list *metainternalversion.List
paginatedResult := false
@@ -102,6 +103,7 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
options.Limit = 0
options.Continue = ""
options.ResourceVersion = requestedResourceVersion
options.ResourceVersionMatch = requestedResourceVersionMatch
result, err := p.PageFn(ctx, options)
return result, paginatedResult, err
}
@@ -135,10 +137,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// set the next loop up
options.Continue = m.GetContinue()
// Clear the ResourceVersion on the subsequent List calls to avoid the
// Clear the ResourceVersion(Match) on the subsequent List calls to avoid the
// `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = ""
options.ResourceVersionMatch = ""
// At this point, result is already paginated.
paginatedResult = true
}

View File

@@ -76,6 +76,10 @@ func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (
p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
return nil, fmt.Errorf("invariant violated")
}
if options.Continue != "" && options.ResourceVersionMatch != "" {
p.t.Errorf("invariant violated, specifying resource version match type (%s) is not allowed when using continue (%s).", options.ResourceVersionMatch, options.Continue)
return nil, fmt.Errorf("invariant violated")
}
var list metainternalversion.List
total := options.Limit
if total == 0 {
@@ -201,6 +205,13 @@ func TestListPager_List(t *testing.T) {
want: list(11, "rv:20"),
wantPaged: true,
},
{
name: "two pages with resourceVersion and resourceVersionMatch",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}},
want: list(11, "rv:20"),
wantPaged: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package transport
import (
"context"
"fmt"
"net"
"net/http"
@@ -50,6 +51,9 @@ type tlsCacheKey struct {
serverName string
nextProtos string
disableCompression bool
// these functions are wrapped to allow them to be used as map keys
getCert *GetCertHolder
dial *DialHolder
}
func (t tlsCacheKey) String() string {
@@ -57,7 +61,8 @@ func (t tlsCacheKey) String() string {
if len(t.keyData) > 0 {
keyText = "<redacted>"
}
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s, disableCompression:%t", t.insecure, t.caData, t.certData, keyText, t.serverName, t.disableCompression)
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s, disableCompression:%t, getCert:%p, dial:%p",
t.insecure, t.caData, t.certData, keyText, t.serverName, t.disableCompression, t.getCert, t.dial)
}
func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
@@ -87,8 +92,10 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
return http.DefaultTransport, nil
}
dial := config.Dial
if dial == nil {
var dial func(ctx context.Context, network, address string) (net.Conn, error)
if config.Dial != nil {
dial = config.Dial
} else {
dial = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
@@ -133,10 +140,18 @@ func tlsConfigKey(c *Config) (tlsCacheKey, bool, error) {
return tlsCacheKey{}, false, err
}
if c.TLS.GetCert != nil || c.Dial != nil || c.Proxy != nil {
if c.Proxy != nil {
// cannot determine equality for functions
return tlsCacheKey{}, false, nil
}
if c.Dial != nil && c.DialHolder == nil {
// cannot determine equality for dial function that doesn't have non-nil DialHolder set as well
return tlsCacheKey{}, false, nil
}
if c.TLS.GetCert != nil && c.TLS.GetCertHolder == nil {
// cannot determine equality for getCert function that doesn't have non-nil GetCertHolder set as well
return tlsCacheKey{}, false, nil
}
k := tlsCacheKey{
insecure: c.TLS.Insecure,
@@ -144,6 +159,8 @@ func tlsConfigKey(c *Config) (tlsCacheKey, bool, error) {
serverName: c.TLS.ServerName,
nextProtos: strings.Join(c.TLS.NextProtos, ","),
disableCompression: c.DisableCompression,
getCert: c.TLS.GetCertHolder,
dial: c.DialHolder,
}
if c.TLS.ReloadTLSFiles {

View File

@@ -21,6 +21,7 @@ import (
"crypto/tls"
"net"
"net/http"
"net/url"
"testing"
)
@@ -58,16 +59,24 @@ func TestTLSConfigKey(t *testing.T) {
t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
}
if keyA != (tlsCacheKey{}) {
t.Errorf("Expected empty cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
}
}
}
// Make sure config fields that affect the tls config affect the cache key
dialer := net.Dialer{}
getCert := func() (*tls.Certificate, error) { return nil, nil }
getCertHolder := &GetCertHolder{GetCert: getCert}
uniqueConfigurations := map[string]*Config{
"proxy": {Proxy: func(request *http.Request) (*url.URL, error) { return nil, nil }},
"no tls": {},
"dialer": {Dial: dialer.DialContext},
"dialer2": {Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil }},
"dialer3": {Dial: dialer.DialContext, DialHolder: &DialHolder{Dial: dialer.DialContext}},
"dialer4": {Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil }, DialHolder: &DialHolder{Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil }}},
"insecure": {TLS: TLSConfig{Insecure: true}},
"cadata 1": {TLS: TLSConfig{CAData: []byte{1}}},
"cadata 2": {TLS: TLSConfig{CAData: []byte{2}}},
@@ -128,6 +137,13 @@ func TestTLSConfigKey(t *testing.T) {
GetCert: func() (*tls.Certificate, error) { return nil, nil },
},
},
"getCert3": {
TLS: TLSConfig{
KeyData: []byte{1},
GetCert: getCert,
GetCertHolder: getCertHolder,
},
},
"getCert1, key 2": {
TLS: TLSConfig{
KeyData: []byte{2},

View File

@@ -68,7 +68,11 @@ type Config struct {
WrapTransport WrapperFunc
// Dial specifies the dial function for creating unencrypted TCP connections.
// If specified, this transport will be non-cacheable unless DialHolder is also set.
Dial func(ctx context.Context, network, address string) (net.Conn, error)
// DialHolder can be populated to make transport configs cacheable.
// If specified, DialHolder.Dial must be equal to Dial.
DialHolder *DialHolder
// Proxy is the proxy func to be used for all requests made by this
// transport. If Proxy is nil, http.ProxyFromEnvironment is used. If Proxy
@@ -78,6 +82,11 @@ type Config struct {
Proxy func(*http.Request) (*url.URL, error)
}
// DialHolder is used to make the wrapped function comparable so that it can be used as a map key.
type DialHolder struct {
Dial func(ctx context.Context, network, address string) (net.Conn, error)
}
// ImpersonationConfig has all the available impersonation options
type ImpersonationConfig struct {
// UserName matches user.Info.GetName()
@@ -141,5 +150,15 @@ type TLSConfig struct {
// To use only http/1.1, set to ["http/1.1"].
NextProtos []string
GetCert func() (*tls.Certificate, error) // Callback that returns a TLS client certificate. CertData, CertFile, KeyData and KeyFile supercede this field.
// Callback that returns a TLS client certificate. CertData, CertFile, KeyData and KeyFile supercede this field.
// If specified, this transport is non-cacheable unless CertHolder is populated.
GetCert func() (*tls.Certificate, error)
// CertHolder can be populated to make transport configs that set GetCert cacheable.
// If set, CertHolder.GetCert must be equal to GetCert.
GetCertHolder *GetCertHolder
}
// GetCertHolder is used to make the wrapped function comparable so that it can be used as a map key.
type GetCertHolder struct {
GetCert func() (*tls.Certificate, error)
}

View File

@@ -24,6 +24,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"reflect"
"sync"
"time"
@@ -39,6 +40,10 @@ func New(config *Config) (http.RoundTripper, error) {
return nil, fmt.Errorf("using a custom transport with TLS certificate options or the insecure flag is not allowed")
}
if !isValidHolders(config) {
return nil, fmt.Errorf("misconfigured holder for dialer or cert callback")
}
var (
rt http.RoundTripper
err error
@@ -56,6 +61,26 @@ func New(config *Config) (http.RoundTripper, error) {
return HTTPWrappersForConfig(config, rt)
}
func isValidHolders(config *Config) bool {
if config.TLS.GetCertHolder != nil {
if config.TLS.GetCertHolder.GetCert == nil ||
config.TLS.GetCert == nil ||
reflect.ValueOf(config.TLS.GetCertHolder.GetCert).Pointer() != reflect.ValueOf(config.TLS.GetCert).Pointer() {
return false
}
}
if config.DialHolder != nil {
if config.DialHolder.Dial == nil ||
config.Dial == nil ||
reflect.ValueOf(config.DialHolder.Dial).Pointer() != reflect.ValueOf(config.Dial).Pointer() {
return false
}
}
return true
}
// TLSConfigFor returns a tls.Config that will provide the transport level security defined
// by the provided Config. Will return nil if no transport level security is requested.
func TLSConfigFor(c *Config) (*tls.Config, error) {

View File

@@ -21,6 +21,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"testing"
)
@@ -94,6 +95,13 @@ stR0Yiw0buV6DL/moUO0HIM9Bjh96HJp+LxiIS6UCdIhMPp5HoQa
)
func TestNew(t *testing.T) {
globalGetCert := &GetCertHolder{
GetCert: func() (*tls.Certificate, error) { return nil, nil },
}
globalDial := &DialHolder{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil },
}
testCases := map[string]struct {
Config *Config
Err bool
@@ -255,6 +263,144 @@ func TestNew(t *testing.T) {
},
},
},
"nil holders and nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: nil,
GetCertHolder: nil,
},
Dial: nil,
DialHolder: nil,
},
Err: false,
TLS: false,
TLSCert: false,
TLSErr: false,
Default: true,
Insecure: false,
DefaultRoots: false,
},
"nil holders and non-nil regular get cert": {
Config: &Config{
TLS: TLSConfig{
GetCert: func() (*tls.Certificate, error) { return nil, nil },
GetCertHolder: nil,
},
Dial: nil,
DialHolder: nil,
},
Err: false,
TLS: true,
TLSCert: true,
TLSErr: false,
Default: false,
Insecure: false,
DefaultRoots: true,
},
"nil holders and non-nil regular dial": {
Config: &Config{
TLS: TLSConfig{
GetCert: nil,
GetCertHolder: nil,
},
Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil },
DialHolder: nil,
},
Err: false,
TLS: true,
TLSCert: false,
TLSErr: false,
Default: false,
Insecure: false,
DefaultRoots: true,
},
"non-nil dial holder and nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: nil,
GetCertHolder: nil,
},
Dial: nil,
DialHolder: &DialHolder{},
},
Err: true,
},
"non-nil cert holder and nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: nil,
GetCertHolder: &GetCertHolder{},
},
Dial: nil,
DialHolder: nil,
},
Err: true,
},
"non-nil dial holder and non-nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: nil,
GetCertHolder: nil,
},
Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil },
DialHolder: &DialHolder{},
},
Err: true,
},
"non-nil cert holder and non-nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: func() (*tls.Certificate, error) { return nil, nil },
GetCertHolder: &GetCertHolder{},
},
Dial: nil,
DialHolder: nil,
},
Err: true,
},
"non-nil dial holder+internal and non-nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: nil,
GetCertHolder: nil,
},
Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil },
DialHolder: &DialHolder{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return nil, nil },
},
},
Err: true,
},
"non-nil cert holder+internal and non-nil regular": {
Config: &Config{
TLS: TLSConfig{
GetCert: func() (*tls.Certificate, error) { return nil, nil },
GetCertHolder: &GetCertHolder{
GetCert: func() (*tls.Certificate, error) { return nil, nil },
},
},
Dial: nil,
DialHolder: nil,
},
Err: true,
},
"non-nil holders+internal and non-nil regular with correct address": {
Config: &Config{
TLS: TLSConfig{
GetCert: globalGetCert.GetCert,
GetCertHolder: globalGetCert,
},
Dial: globalDial.Dial,
DialHolder: globalDial,
},
Err: false,
TLS: true,
TLSCert: true,
TLSErr: false,
Default: false,
Insecure: false,
DefaultRoots: true,
},
}
for k, testCase := range testCases {
t.Run(k, func(t *testing.T) {

View File

@@ -155,7 +155,10 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item)

View File

@@ -17,10 +17,13 @@ limitations under the License.
package workqueue_test
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
@@ -159,3 +162,40 @@ func TestReinsert(t *testing.T) {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
// TestGarbageCollection ensures that objects that are added then removed from the queue are
// able to be garbage collected.
func TestGarbageCollection(t *testing.T) {
type bigObject struct {
data []byte
}
leakQueue := workqueue.New()
t.Cleanup(func() {
// Make sure leakQueue doesn't go out of scope too early
runtime.KeepAlive(leakQueue)
})
c := &bigObject{data: []byte("hello")}
mustGarbageCollect(t, c)
leakQueue.Add(c)
o, _ := leakQueue.Get()
leakQueue.Done(o)
}
// mustGarbageCollect asserts than an object was garbage collected by the end of the test.
// The input must be a pointer to an object.
func mustGarbageCollect(t *testing.T, i interface{}) {
t.Helper()
var collected int32 = 0
runtime.SetFinalizer(i, func(x interface{}) {
atomic.StoreInt32(&collected, 1)
})
t.Cleanup(func() {
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
runtime.GC()
return atomic.LoadInt32(&collected) == 1, nil
}); err != nil {
t.Errorf("object was not garbage collected")
}
})
}