client-go: add an exec-based client auth provider

Kubernetes-commit: 6463e9efd9ba552e60d2555a3e6526ef90196473
This commit is contained in:
Eric Chiang
2018-02-07 15:43:12 -08:00
committed by Kubernetes Publisher
parent d902e7da4b
commit 19c591bac2
20 changed files with 1272 additions and 4 deletions

View File

@@ -0,0 +1,280 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"sync"
"time"
"github.com/golang/glog"
"golang.org/x/crypto/ssh/terminal"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/pkg/apis/clientauthentication"
"k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
"k8s.io/client-go/tools/clientcmd/api"
)
const execInfoEnv = "KUBERNETES_EXEC_INFO"
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
func init() {
v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
v1alpha1.AddToScheme(scheme)
clientauthentication.AddToScheme(scheme)
}
var (
// Since transports can be constantly re-initialized by programs like kubectl,
// keep a cache of initialized authenticators keyed by a hash of their config.
globalCache = newCache()
// The list of API versions we accept.
apiVersions = map[string]schema.GroupVersion{
v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
}
)
func newCache() *cache {
return &cache{m: make(map[string]*Authenticator)}
}
func cacheKey(c *api.ExecConfig) string {
return fmt.Sprintf("%#v", c)
}
type cache struct {
mu sync.Mutex
m map[string]*Authenticator
}
func (c *cache) get(s string) (*Authenticator, bool) {
c.mu.Lock()
defer c.mu.Unlock()
a, ok := c.m[s]
return a, ok
}
// put inserts an authenticator into the cache. If an authenticator is already
// associated with the key, the first one is returned instead.
func (c *cache) put(s string, a *Authenticator) *Authenticator {
c.mu.Lock()
defer c.mu.Unlock()
existing, ok := c.m[s]
if ok {
return existing
}
c.m[s] = a
return a
}
// GetAuthenticator returns an exec-based plugin for providing client credentials.
func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
return newAuthenticator(globalCache, config)
}
func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
key := cacheKey(config)
if a, ok := c.get(key); ok {
return a, nil
}
gv, ok := apiVersions[config.APIVersion]
if !ok {
return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
}
a := &Authenticator{
cmd: config.Command,
args: config.Args,
group: gv,
stdin: os.Stdin,
stderr: os.Stderr,
interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
now: time.Now,
environ: os.Environ,
}
for _, env := range config.Env {
a.env = append(a.env, env.Name+"="+env.Value)
}
return c.put(key, a), nil
}
// Authenticator is a client credential provider that rotates credentials by executing a plugin.
// The plugin input and output are defined by the API group client.authentication.k8s.io.
type Authenticator struct {
// Set by the config
cmd string
args []string
group schema.GroupVersion
env []string
// Stubbable for testing
stdin io.Reader
stderr io.Writer
interactive bool
now func() time.Time
environ func() []string
// Cached results.
//
// The mutex also guards calling the plugin. Since the plugin could be
// interactive we want to make sure it's only called once.
mu sync.Mutex
cachedToken string
exp time.Time
}
// WrapTransport instruments an existing http.RoundTripper with credentials returned
// by the plugin.
func (a *Authenticator) WrapTransport(rt http.RoundTripper) http.RoundTripper {
return &roundTripper{a, rt}
}
type roundTripper struct {
a *Authenticator
base http.RoundTripper
}
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// If a user has already set credentials, use that. This makes commands like
// "kubectl get --token (token) pods" work.
if req.Header.Get("Authorization") != "" {
return r.base.RoundTrip(req)
}
token, err := r.a.token()
if err != nil {
return nil, fmt.Errorf("getting token: %v", err)
}
req.Header.Set("Authorization", "Bearer "+token)
res, err := r.base.RoundTrip(req)
if err != nil {
return nil, err
}
if res.StatusCode == http.StatusUnauthorized {
resp := &clientauthentication.Response{
Header: res.Header,
Code: int32(res.StatusCode),
}
if err := r.a.refresh(token, resp); err != nil {
glog.Errorf("refreshing token: %v", err)
}
}
return res, nil
}
func (a *Authenticator) tokenExpired() bool {
if a.exp.IsZero() {
return false
}
return a.now().After(a.exp)
}
func (a *Authenticator) token() (string, error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.cachedToken != "" && !a.tokenExpired() {
return a.cachedToken, nil
}
return a.getToken(nil)
}
// refresh executes the plugin to force a rotation of the token.
func (a *Authenticator) refresh(token string, r *clientauthentication.Response) error {
a.mu.Lock()
defer a.mu.Unlock()
if token != a.cachedToken {
// Token already rotated.
return nil
}
_, err := a.getToken(r)
return err
}
// getToken executes the plugin and reads the credentials from stdout. It must be
// called while holding the Authenticator's mutex.
func (a *Authenticator) getToken(r *clientauthentication.Response) (string, error) {
cred := &clientauthentication.ExecCredential{
Spec: clientauthentication.ExecCredentialSpec{
Response: r,
Interactive: a.interactive,
},
}
data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
if err != nil {
return "", fmt.Errorf("encode ExecCredentials: %v", err)
}
env := append(a.environ(), a.env...)
env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
stdout := &bytes.Buffer{}
cmd := exec.Command(a.cmd, a.args...)
cmd.Env = env
cmd.Stderr = a.stderr
cmd.Stdout = stdout
if a.interactive {
cmd.Stdin = a.stdin
}
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("exec: %v", err)
}
_, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
if err != nil {
return "", fmt.Errorf("decode stdout: %v", err)
}
if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
return "", fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
}
if cred.Status == nil {
return "", fmt.Errorf("exec plugin didn't return a status field")
}
if cred.Status.Token == "" {
return "", fmt.Errorf("exec plugin didn't return a token")
}
if cred.Status.ExpirationTimestamp != nil {
a.exp = cred.Status.ExpirationTimestamp.Time
} else {
a.exp = time.Time{}
}
a.cachedToken = cred.Status.Token
return a.cachedToken, nil
}

View File

@@ -0,0 +1,413 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"k8s.io/client-go/pkg/apis/clientauthentication"
"k8s.io/client-go/tools/clientcmd/api"
)
func TestCacheKey(t *testing.T) {
c1 := &api.ExecConfig{
Command: "foo-bar",
Args: []string{"1", "2"},
Env: []api.ExecEnvVar{
{Name: "3", Value: "4"},
{Name: "5", Value: "6"},
{Name: "7", Value: "8"},
},
APIVersion: "client.authentication.k8s.io/v1alpha1",
}
c2 := &api.ExecConfig{
Command: "foo-bar",
Args: []string{"1", "2"},
Env: []api.ExecEnvVar{
{Name: "3", Value: "4"},
{Name: "5", Value: "6"},
{Name: "7", Value: "8"},
},
APIVersion: "client.authentication.k8s.io/v1alpha1",
}
c3 := &api.ExecConfig{
Command: "foo-bar",
Args: []string{"1", "2"},
Env: []api.ExecEnvVar{
{Name: "3", Value: "4"},
{Name: "5", Value: "6"},
},
APIVersion: "client.authentication.k8s.io/v1alpha1",
}
key1 := cacheKey(c1)
key2 := cacheKey(c2)
key3 := cacheKey(c3)
if key1 != key2 {
t.Error("key1 and key2 didn't match")
}
if key1 == key3 {
t.Error("key1 and key3 matched")
}
if key2 == key3 {
t.Error("key2 and key3 matched")
}
}
func compJSON(t *testing.T, got, want []byte) {
t.Helper()
gotJSON := &bytes.Buffer{}
wantJSON := &bytes.Buffer{}
if err := json.Indent(gotJSON, got, "", " "); err != nil {
t.Errorf("got invalid JSON: %v", err)
}
if err := json.Indent(wantJSON, want, "", " "); err != nil {
t.Errorf("want invalid JSON: %v", err)
}
g := strings.TrimSpace(gotJSON.String())
w := strings.TrimSpace(wantJSON.String())
if g != w {
t.Errorf("wanted %q, got %q", w, g)
}
}
func TestGetToken(t *testing.T) {
tests := []struct {
name string
config api.ExecConfig
output string
interactive bool
response *clientauthentication.Response
wantInput string
wantToken string
wantExpiry time.Time
wantErr bool
}{
{
name: "basic-request",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {}
}`,
output: `{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "foo-bar"
}
}`,
wantToken: "foo-bar",
},
{
name: "interactive",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
interactive: true,
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {
"interactive": true
}
}`,
output: `{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "foo-bar"
}
}`,
wantToken: "foo-bar",
},
{
name: "response",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
response: &clientauthentication.Response{
Header: map[string][]string{
"WWW-Authenticate": {`Basic realm="Access to the staging site", charset="UTF-8"`},
},
Code: 401,
},
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {
"response": {
"header": {
"WWW-Authenticate": [
"Basic realm=\"Access to the staging site\", charset=\"UTF-8\""
]
},
"code": 401
}
}
}`,
output: `{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "foo-bar"
}
}`,
wantToken: "foo-bar",
},
{
name: "expiry",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {}
}`,
output: `{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "foo-bar",
"expirationTimestamp": "2006-01-02T15:04:05Z"
}
}`,
wantExpiry: time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC),
wantToken: "foo-bar",
},
{
name: "no-group-version",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {}
}`,
output: `{
"kind": "ExecCredential",
"status": {
"token": "foo-bar"
}
}`,
wantErr: true,
},
{
name: "no-status",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {}
}`,
output: `{
"kind": "ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1"
}`,
wantErr: true,
},
{
name: "no-token",
config: api.ExecConfig{
APIVersion: "client.authentication.k8s.io/v1alpha1",
},
wantInput: `{
"kind":"ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"spec": {}
}`,
output: `{
"kind": "ExecCredential",
"apiVersion":"client.authentication.k8s.io/v1alpha1",
"status": {}
}`,
wantErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := test.config
c.Command = "./testdata/test-plugin.sh"
c.Env = append(c.Env, api.ExecEnvVar{
Name: "TEST_OUTPUT",
Value: test.output,
})
a, err := newAuthenticator(newCache(), &c)
if err != nil {
t.Fatal(err)
}
stderr := &bytes.Buffer{}
a.stderr = stderr
a.interactive = test.interactive
a.environ = func() []string { return nil }
token, err := a.getToken(test.response)
if err != nil {
if !test.wantErr {
t.Errorf("get token %v", err)
}
return
}
if test.wantErr {
t.Fatal("expected error getting token")
}
if token != test.wantToken {
t.Errorf("expected token %q got %q", test.wantToken, token)
}
if !a.exp.Equal(test.wantExpiry) {
t.Errorf("expected expiry %v got %v", test.wantExpiry, a.exp)
}
compJSON(t, stderr.Bytes(), []byte(test.wantInput))
})
}
}
func TestRoundTripper(t *testing.T) {
wantToken := ""
n := time.Now()
now := func() time.Time { return n }
env := []string{""}
environ := func() []string {
s := make([]string, len(env))
copy(s, env)
return s
}
setOutput := func(s string) {
env[0] = "TEST_OUTPUT=" + s
}
handler := func(w http.ResponseWriter, r *http.Request) {
gotToken := ""
parts := strings.Split(r.Header.Get("Authorization"), " ")
if len(parts) > 1 && strings.EqualFold(parts[0], "bearer") {
gotToken = parts[1]
}
if wantToken != gotToken {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
fmt.Fprintln(w, "ok")
}
server := httptest.NewServer(http.HandlerFunc(handler))
c := api.ExecConfig{
Command: "./testdata/test-plugin.sh",
APIVersion: "client.authentication.k8s.io/v1alpha1",
}
a, err := newAuthenticator(newCache(), &c)
if err != nil {
t.Fatal(err)
}
a.environ = environ
a.now = now
a.stderr = ioutil.Discard
client := http.Client{
Transport: a.WrapTransport(http.DefaultTransport),
}
get := func(t *testing.T, statusCode int) {
t.Helper()
resp, err := client.Get(server.URL)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != statusCode {
t.Errorf("wanted status %d got %d", statusCode, resp.StatusCode)
}
}
setOutput(`{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "token1"
}
}`)
wantToken = "token1"
get(t, http.StatusOK)
setOutput(`{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "token2"
}
}`)
// Previous token should be cached
get(t, http.StatusOK)
wantToken = "token2"
// Token is still cached, hits unauthorized but causes token to rotate.
get(t, http.StatusUnauthorized)
// Follow up request uses the rotated token.
get(t, http.StatusOK)
setOutput(`{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "token3",
"expirationTimestamp": "` + now().Add(time.Hour).Format(time.RFC3339Nano) + `"
}
}`)
wantToken = "token3"
// Token is still cached, hit's unauthorized but causes rotation to token with an expiry.
get(t, http.StatusUnauthorized)
get(t, http.StatusOK)
// Move time forward 2 hours, "token3" is now expired.
n = n.Add(time.Hour * 2)
setOutput(`{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"status": {
"token": "token4",
"expirationTimestamp": "` + now().Add(time.Hour).Format(time.RFC3339Nano) + `"
}
}`)
wantToken = "token4"
// Old token is expired, should refresh automatically without hitting a 401.
get(t, http.StatusOK)
}

View File

@@ -0,0 +1,18 @@
#!/bin/sh -e
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
>&2 echo "$KUBERNETES_EXEC_INFO"
echo "$TEST_OUTPUT"