mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-13 13:52:01 +00:00
Merge pull request #97857 from liggitt/exec-auth-reuse-dialer
Track opened connections with a single tracker per authenticator Kubernetes-commit: e9dba7a627520f89778b367fc0d955776f220638
This commit is contained in:
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@@ -460,11 +460,11 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api",
|
"ImportPath": "k8s.io/api",
|
||||||
"Rev": "9ab310c4912f"
|
"Rev": "4e9f5db10201"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/apimachinery",
|
"ImportPath": "k8s.io/apimachinery",
|
||||||
"Rev": "0ca7b349afd2"
|
"Rev": "6c16abd71758"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/gengo",
|
"ImportPath": "k8s.io/gengo",
|
||||||
|
8
go.mod
8
go.mod
@@ -26,14 +26,14 @@ require (
|
|||||||
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
|
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
|
||||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
||||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||||
k8s.io/api v0.0.0-20210105210245-9ab310c4912f
|
k8s.io/api v0.0.0-20210107085826-4e9f5db10201
|
||||||
k8s.io/apimachinery v0.0.0-20210103120354-0ca7b349afd2
|
k8s.io/apimachinery v0.0.0-20210106165743-6c16abd71758
|
||||||
k8s.io/klog/v2 v2.4.0
|
k8s.io/klog/v2 v2.4.0
|
||||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
|
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
|
||||||
sigs.k8s.io/yaml v1.2.0
|
sigs.k8s.io/yaml v1.2.0
|
||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20210105210245-9ab310c4912f
|
k8s.io/api => k8s.io/api v0.0.0-20210107085826-4e9f5db10201
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210103120354-0ca7b349afd2
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210106165743-6c16abd71758
|
||||||
)
|
)
|
||||||
|
4
go.sum
4
go.sum
@@ -427,8 +427,8 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
|
|||||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
k8s.io/api v0.0.0-20210105210245-9ab310c4912f/go.mod h1:ugSMEpFie0GsLA3IjyrjlN2Qh3uydrt9YCmiq35Uwk0=
|
k8s.io/api v0.0.0-20210107085826-4e9f5db10201/go.mod h1:3Xl3BjPKHhLlv0+0TYKMZ8NNiKsby57AFDZIBy5Rv0o=
|
||||||
k8s.io/apimachinery v0.0.0-20210103120354-0ca7b349afd2/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU=
|
k8s.io/apimachinery v0.0.0-20210106165743-6c16abd71758/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU=
|
||||||
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||||
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
||||||
k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=
|
k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=
|
||||||
|
@@ -18,7 +18,6 @@ package exec
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"errors"
|
"errors"
|
||||||
@@ -52,7 +51,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const execInfoEnv = "KUBERNETES_EXEC_INFO"
|
const execInfoEnv = "KUBERNETES_EXEC_INFO"
|
||||||
const onRotateListWarningLength = 1000
|
|
||||||
const installHintVerboseHelp = `
|
const installHintVerboseHelp = `
|
||||||
|
|
||||||
It looks like you are trying to use a client-go credential plugin that is not installed.
|
It looks like you are trying to use a client-go credential plugin that is not installed.
|
||||||
@@ -177,6 +175,12 @@ func newAuthenticator(c *cache, config *api.ExecConfig, cluster *clientauthentic
|
|||||||
return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
|
return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connTracker := connrotation.NewConnectionTracker()
|
||||||
|
defaultDialer := connrotation.NewDialerWithTracker(
|
||||||
|
(&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
|
||||||
|
connTracker,
|
||||||
|
)
|
||||||
|
|
||||||
a := &Authenticator{
|
a := &Authenticator{
|
||||||
cmd: config.Command,
|
cmd: config.Command,
|
||||||
args: config.Args,
|
args: config.Args,
|
||||||
@@ -196,6 +200,9 @@ func newAuthenticator(c *cache, config *api.ExecConfig, cluster *clientauthentic
|
|||||||
interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
|
interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
|
||||||
now: time.Now,
|
now: time.Now,
|
||||||
environ: os.Environ,
|
environ: os.Environ,
|
||||||
|
|
||||||
|
defaultDialer: defaultDialer,
|
||||||
|
connTracker: connTracker,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, env := range config.Env {
|
for _, env := range config.Env {
|
||||||
@@ -229,6 +236,11 @@ type Authenticator struct {
|
|||||||
now func() time.Time
|
now func() time.Time
|
||||||
environ func() []string
|
environ func() []string
|
||||||
|
|
||||||
|
// defaultDialer is used for clients which don't specify a custom dialer
|
||||||
|
defaultDialer *connrotation.Dialer
|
||||||
|
// connTracker tracks all connections opened that we need to close when rotating a client certificate
|
||||||
|
connTracker *connrotation.ConnectionTracker
|
||||||
|
|
||||||
// Cached results.
|
// Cached results.
|
||||||
//
|
//
|
||||||
// The mutex also guards calling the plugin. Since the plugin could be
|
// The mutex also guards calling the plugin. Since the plugin could be
|
||||||
@@ -236,8 +248,6 @@ type Authenticator struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
cachedCreds *credentials
|
cachedCreds *credentials
|
||||||
exp time.Time
|
exp time.Time
|
||||||
|
|
||||||
onRotateList []func()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type credentials struct {
|
type credentials struct {
|
||||||
@@ -266,20 +276,12 @@ func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
|
|||||||
}
|
}
|
||||||
c.TLS.GetCert = a.cert
|
c.TLS.GetCert = a.cert
|
||||||
|
|
||||||
var dial func(ctx context.Context, network, addr string) (net.Conn, error)
|
var d *connrotation.Dialer
|
||||||
if c.Dial != nil {
|
if c.Dial != nil {
|
||||||
dial = c.Dial
|
// if c has a custom dialer, we have to wrap it
|
||||||
|
d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
|
||||||
} else {
|
} else {
|
||||||
dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
|
d = a.defaultDialer
|
||||||
}
|
|
||||||
d := connrotation.NewDialer(dial)
|
|
||||||
|
|
||||||
a.mu.Lock()
|
|
||||||
defer a.mu.Unlock()
|
|
||||||
a.onRotateList = append(a.onRotateList, d.CloseAll)
|
|
||||||
onRotateListLength := len(a.onRotateList)
|
|
||||||
if onRotateListLength > onRotateListWarningLength {
|
|
||||||
klog.Warningf("constructing many client instances from the same exec auth config can cause performance problems during cert rotation and can exhaust available network connections; %d clients constructed calling %q", onRotateListLength, a.cmd)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Dial = d.DialContext
|
c.Dial = d.DialContext
|
||||||
@@ -458,9 +460,7 @@ func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) err
|
|||||||
if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
|
if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
|
||||||
metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore))
|
metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore))
|
||||||
}
|
}
|
||||||
for _, onRotate := range a.onRotateList {
|
a.connTracker.CloseAll()
|
||||||
onRotate()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
expiry := time.Time{}
|
expiry := time.Time{}
|
||||||
|
@@ -33,18 +33,40 @@ type DialFunc func(ctx context.Context, network, address string) (net.Conn, erro
|
|||||||
// Dialer opens connections through Dial and tracks them.
|
// Dialer opens connections through Dial and tracks them.
|
||||||
type Dialer struct {
|
type Dialer struct {
|
||||||
dial DialFunc
|
dial DialFunc
|
||||||
|
*ConnectionTracker
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDialer creates a new Dialer instance.
|
||||||
|
// Equivalent to NewDialerWithTracker(dial, nil).
|
||||||
|
func NewDialer(dial DialFunc) *Dialer {
|
||||||
|
return NewDialerWithTracker(dial, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDialerWithTracker creates a new Dialer instance.
|
||||||
|
//
|
||||||
|
// If dial is not nil, it will be used to create new underlying connections.
|
||||||
|
// Otherwise net.DialContext is used.
|
||||||
|
// If tracker is not nil, it will be used to track new underlying connections.
|
||||||
|
// Otherwise NewConnectionTracker() is used.
|
||||||
|
func NewDialerWithTracker(dial DialFunc, tracker *ConnectionTracker) *Dialer {
|
||||||
|
if tracker == nil {
|
||||||
|
tracker = NewConnectionTracker()
|
||||||
|
}
|
||||||
|
return &Dialer{
|
||||||
|
dial: dial,
|
||||||
|
ConnectionTracker: tracker,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectionTracker keeps track of opened connections
|
||||||
|
type ConnectionTracker struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
conns map[*closableConn]struct{}
|
conns map[*closableConn]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDialer creates a new Dialer instance.
|
// NewConnectionTracker returns a connection tracker for use with NewDialerWithTracker
|
||||||
//
|
func NewConnectionTracker() *ConnectionTracker {
|
||||||
// If dial is not nil, it will be used to create new underlying connections.
|
return &ConnectionTracker{
|
||||||
// Otherwise net.DialContext is used.
|
|
||||||
func NewDialer(dial DialFunc) *Dialer {
|
|
||||||
return &Dialer{
|
|
||||||
dial: dial,
|
|
||||||
conns: make(map[*closableConn]struct{}),
|
conns: make(map[*closableConn]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -52,17 +74,40 @@ func NewDialer(dial DialFunc) *Dialer {
|
|||||||
// CloseAll forcibly closes all tracked connections.
|
// CloseAll forcibly closes all tracked connections.
|
||||||
//
|
//
|
||||||
// Note: new connections may get created before CloseAll returns.
|
// Note: new connections may get created before CloseAll returns.
|
||||||
func (d *Dialer) CloseAll() {
|
func (c *ConnectionTracker) CloseAll() {
|
||||||
d.mu.Lock()
|
c.mu.Lock()
|
||||||
conns := d.conns
|
conns := c.conns
|
||||||
d.conns = make(map[*closableConn]struct{})
|
c.conns = make(map[*closableConn]struct{})
|
||||||
d.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
for conn := range conns {
|
for conn := range conns {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track adds the connection to the list of tracked connections,
|
||||||
|
// and returns a wrapped copy of the connection that stops tracking the connection
|
||||||
|
// when it is closed.
|
||||||
|
func (c *ConnectionTracker) Track(conn net.Conn) net.Conn {
|
||||||
|
closable := &closableConn{Conn: conn}
|
||||||
|
|
||||||
|
// When the connection is closed, remove it from the map. This will
|
||||||
|
// be no-op if the connection isn't in the map, e.g. if CloseAll()
|
||||||
|
// is called.
|
||||||
|
closable.onClose = func() {
|
||||||
|
c.mu.Lock()
|
||||||
|
delete(c.conns, closable)
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start tracking the connection
|
||||||
|
c.mu.Lock()
|
||||||
|
c.conns[closable] = struct{}{}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
return closable
|
||||||
|
}
|
||||||
|
|
||||||
// Dial creates a new tracked connection.
|
// Dial creates a new tracked connection.
|
||||||
func (d *Dialer) Dial(network, address string) (net.Conn, error) {
|
func (d *Dialer) Dial(network, address string) (net.Conn, error) {
|
||||||
return d.DialContext(context.Background(), network, address)
|
return d.DialContext(context.Background(), network, address)
|
||||||
@@ -74,24 +119,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return d.ConnectionTracker.Track(conn), nil
|
||||||
closable := &closableConn{Conn: conn}
|
|
||||||
|
|
||||||
// When the connection is closed, remove it from the map. This will
|
|
||||||
// be no-op if the connection isn't in the map, e.g. if CloseAll()
|
|
||||||
// is called.
|
|
||||||
closable.onClose = func() {
|
|
||||||
d.mu.Lock()
|
|
||||||
delete(d.conns, closable)
|
|
||||||
d.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start tracking the connection
|
|
||||||
d.mu.Lock()
|
|
||||||
d.conns[closable] = struct{}{}
|
|
||||||
d.mu.Unlock()
|
|
||||||
|
|
||||||
return closable, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type closableConn struct {
|
type closableConn struct {
|
||||||
|
Reference in New Issue
Block a user