Add TLS support to exec authenticator plugin

https://github.com/kubernetes/community/blob/master/contributors/design-proposals/auth/kubectl-exec-plugins.md#tls-client-certificate-support

Allows exec plugin to return raw TLS key/cert data. This data populates
transport.Config.TLS fields.
transport.Config.TLS propagates custom credentials using
tls.Config.GetClientCertificate callback.
On key/cert rotation, all connections using old credentials are
closed

Kubernetes-commit: cd89f9473faa60c15b8e9d223e5c4f9dab53627a
This commit is contained in:
Andrew Lytvynov
2018-05-30 14:03:32 -07:00
committed by Kubernetes Publisher
parent 33a8186d0a
commit c669580288
12 changed files with 605 additions and 112 deletions

View File

@@ -18,11 +18,15 @@ package exec
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"reflect"
"sync"
"time"
@@ -35,6 +39,8 @@ import (
"k8s.io/client-go/pkg/apis/clientauthentication"
"k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/connrotation"
)
const execInfoEnv = "KUBERNETES_EXEC_INFO"
@@ -147,14 +153,55 @@ type Authenticator struct {
// The mutex also guards calling the plugin. Since the plugin could be
// interactive we want to make sure it's only called once.
mu sync.Mutex
cachedToken string
cachedCreds *credentials
exp time.Time
onRotate func()
}
// WrapTransport instruments an existing http.RoundTripper with credentials returned
// by the plugin.
func (a *Authenticator) WrapTransport(rt http.RoundTripper) http.RoundTripper {
return &roundTripper{a, rt}
type credentials struct {
token string
cert *tls.Certificate
}
// UpdateTransportConfig updates the transport.Config to use credentials
// returned by the plugin.
func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
wt := c.WrapTransport
c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if wt != nil {
rt = wt(rt)
}
return &roundTripper{a, rt}
}
getCert := c.TLS.GetCert
c.TLS.GetCert = func() (*tls.Certificate, error) {
// If previous GetCert is present and returns a valid non-nil
// certificate, use that. Otherwise use cert from exec plugin.
if getCert != nil {
cert, err := getCert()
if err != nil {
return nil, err
}
if cert != nil {
return cert, nil
}
}
return a.cert()
}
var dial func(ctx context.Context, network, addr string) (net.Conn, error)
if c.Dial != nil {
dial = c.Dial
} else {
dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
}
d := connrotation.NewDialer(dial)
a.onRotate = d.CloseAll
c.Dial = d.DialContext
return nil
}
type roundTripper struct {
@@ -169,11 +216,13 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return r.base.RoundTrip(req)
}
token, err := r.a.token()
creds, err := r.a.getCreds()
if err != nil {
return nil, fmt.Errorf("getting token: %v", err)
return nil, fmt.Errorf("getting credentials: %v", err)
}
if creds.token != "" {
req.Header.Set("Authorization", "Bearer "+creds.token)
}
req.Header.Set("Authorization", "Bearer "+token)
res, err := r.base.RoundTrip(req)
if err != nil {
@@ -184,47 +233,60 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
Header: res.Header,
Code: int32(res.StatusCode),
}
if err := r.a.refresh(token, resp); err != nil {
glog.Errorf("refreshing token: %v", err)
if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
glog.Errorf("refreshing credentials: %v", err)
}
}
return res, nil
}
func (a *Authenticator) tokenExpired() bool {
func (a *Authenticator) credsExpired() bool {
if a.exp.IsZero() {
return false
}
return a.now().After(a.exp)
}
func (a *Authenticator) token() (string, error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.cachedToken != "" && !a.tokenExpired() {
return a.cachedToken, nil
func (a *Authenticator) cert() (*tls.Certificate, error) {
creds, err := a.getCreds()
if err != nil {
return nil, err
}
return a.getToken(nil)
return creds.cert, nil
}
// refresh executes the plugin to force a rotation of the token.
func (a *Authenticator) refresh(token string, r *clientauthentication.Response) error {
func (a *Authenticator) getCreds() (*credentials, error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.cachedCreds != nil && !a.credsExpired() {
return a.cachedCreds, nil
}
if err := a.refreshCredsLocked(nil); err != nil {
return nil, err
}
return a.cachedCreds, nil
}
// maybeRefreshCreds executes the plugin to force a rotation of the
// credentials, unless they were rotated already.
func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
a.mu.Lock()
defer a.mu.Unlock()
if token != a.cachedToken {
// Token already rotated.
// Since we're not making a new pointer to a.cachedCreds in getCreds, no
// need to do deep comparison.
if creds != a.cachedCreds {
// Credentials already rotated.
return nil
}
_, err := a.getToken(r)
return err
return a.refreshCredsLocked(r)
}
// getToken executes the plugin and reads the credentials from stdout. It must be
// called while holding the Authenticator's mutex.
func (a *Authenticator) getToken(r *clientauthentication.Response) (string, error) {
// refreshCredsLocked executes the plugin and reads the credentials from
// stdout. It must be called while holding the Authenticator's mutex.
func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
cred := &clientauthentication.ExecCredential{
Spec: clientauthentication.ExecCredentialSpec{
Response: r,
@@ -234,7 +296,7 @@ func (a *Authenticator) getToken(r *clientauthentication.Response) (string, erro
data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
if err != nil {
return "", fmt.Errorf("encode ExecCredentials: %v", err)
return fmt.Errorf("encode ExecCredentials: %v", err)
}
env := append(a.environ(), a.env...)
@@ -250,23 +312,26 @@ func (a *Authenticator) getToken(r *clientauthentication.Response) (string, erro
}
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("exec: %v", err)
return fmt.Errorf("exec: %v", err)
}
_, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
if err != nil {
return "", fmt.Errorf("decode stdout: %v", err)
return fmt.Errorf("decoding stdout: %v", err)
}
if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
return "", fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
}
if cred.Status == nil {
return "", fmt.Errorf("exec plugin didn't return a status field")
return fmt.Errorf("exec plugin didn't return a status field")
}
if cred.Status.Token == "" {
return "", fmt.Errorf("exec plugin didn't return a token")
if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
}
if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
return fmt.Errorf("exec plugin returned only certificate or key, not both")
}
if cred.Status.ExpirationTimestamp != nil {
@@ -274,7 +339,24 @@ func (a *Authenticator) getToken(r *clientauthentication.Response) (string, erro
} else {
a.exp = time.Time{}
}
a.cachedToken = cred.Status.Token
return a.cachedToken, nil
newCreds := &credentials{
token: cred.Status.Token,
}
if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
if err != nil {
return fmt.Errorf("failed parsing client key/certificate: %v", err)
}
newCreds.cert = &cert
}
oldCreds := a.cachedCreds
a.cachedCreds = newCreds
// Only close all connections when TLS cert rotates. Token rotation doesn't
// need the extra noise.
if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
a.onRotate()
}
return nil
}