mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #80284 from danielqsj/exec
Fix a racing issue in client-go UpdateTransportConfig
This commit is contained in:
commit
48ddf3be87
@ -48,6 +48,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const execInfoEnv = "KUBERNETES_EXEC_INFO"
|
const execInfoEnv = "KUBERNETES_EXEC_INFO"
|
||||||
|
const onRotateListWarningLength = 1000
|
||||||
|
|
||||||
var scheme = runtime.NewScheme()
|
var scheme = runtime.NewScheme()
|
||||||
var codecs = serializer.NewCodecFactory(scheme)
|
var codecs = serializer.NewCodecFactory(scheme)
|
||||||
@ -164,7 +165,7 @@ type Authenticator struct {
|
|||||||
cachedCreds *credentials
|
cachedCreds *credentials
|
||||||
exp time.Time
|
exp time.Time
|
||||||
|
|
||||||
onRotate func()
|
onRotateList []func()
|
||||||
}
|
}
|
||||||
|
|
||||||
type credentials struct {
|
type credentials struct {
|
||||||
@ -191,7 +192,15 @@ func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
|
|||||||
dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
|
dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
|
||||||
}
|
}
|
||||||
d := connrotation.NewDialer(dial)
|
d := connrotation.NewDialer(dial)
|
||||||
a.onRotate = d.CloseAll
|
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
a.onRotateList = append(a.onRotateList, d.CloseAll)
|
||||||
|
onRotateListLength := len(a.onRotateList)
|
||||||
|
if onRotateListLength > onRotateListWarningLength {
|
||||||
|
klog.Warningf("constructing many client instances from the same exec auth config can cause performance problems during cert rotation and can exhaust available network connections; %d clients constructed calling %q", onRotateListLength, a.cmd)
|
||||||
|
}
|
||||||
|
|
||||||
c.Dial = d.DialContext
|
c.Dial = d.DialContext
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -353,8 +362,10 @@ func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) err
|
|||||||
a.cachedCreds = newCreds
|
a.cachedCreds = newCreds
|
||||||
// Only close all connections when TLS cert rotates. Token rotation doesn't
|
// Only close all connections when TLS cert rotates. Token rotation doesn't
|
||||||
// need the extra noise.
|
// need the extra noise.
|
||||||
if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
|
if len(a.onRotateList) > 0 && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
|
||||||
a.onRotate()
|
for _, onRotate := range a.onRotateList {
|
||||||
|
onRotate()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -713,6 +713,52 @@ func TestTLSCredentials(t *testing.T) {
|
|||||||
get(t, "valid TLS cert again", false)
|
get(t, "valid TLS cert again", false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrentUpdateTransportConfig(t *testing.T) {
|
||||||
|
n := time.Now()
|
||||||
|
now := func() time.Time { return n }
|
||||||
|
|
||||||
|
env := []string{""}
|
||||||
|
environ := func() []string {
|
||||||
|
s := make([]string, len(env))
|
||||||
|
copy(s, env)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
c := api.ExecConfig{
|
||||||
|
Command: "./testdata/test-plugin.sh",
|
||||||
|
APIVersion: "client.authentication.k8s.io/v1alpha1",
|
||||||
|
}
|
||||||
|
a, err := newAuthenticator(newCache(), &c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
a.environ = environ
|
||||||
|
a.now = now
|
||||||
|
a.stderr = ioutil.Discard
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
|
numConcurrent := 2
|
||||||
|
|
||||||
|
for i := 0; i < numConcurrent; i++ {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
tc := &transport.Config{}
|
||||||
|
a.UpdateTransportConfig(tc)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
// genClientCert generates an x509 certificate for testing. Certificate and key
|
// genClientCert generates an x509 certificate for testing. Certificate and key
|
||||||
// are returned in PEM encoding.
|
// are returned in PEM encoding.
|
||||||
func genClientCert(t *testing.T) ([]byte, []byte) {
|
func genClientCert(t *testing.T) ([]byte, []byte) {
|
||||||
|
Loading…
Reference in New Issue
Block a user