Merge pull request #33310 from deads2k/controller-04-auto-create-SA

Automatic merge from submit-queue

use service accounts as clients for controllers

Makes it possible for the controller-manager to use service accounts to run individual controllers.  To start, this only enables this feature if this particular controller manager has the power to create service account tokens.  Otherwise, the full-powered client is used instead.  This is a necessary step on the way to subdividing the authority of controllers.

@kubernetes/sig-auth 
@erictune I know you care about this
@ncdc fyi
This commit is contained in:
Kubernetes Submit Queue 2016-10-05 13:46:44 -07:00 committed by GitHub
commit 234c562d4a
6 changed files with 540 additions and 36 deletions

View File

@ -163,7 +163,21 @@ func Run(s *options.CMServer) error {
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
run := func(stop <-chan struct{}) {
err := StartControllers(s, kubeconfig, stop, recorder)
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if len(s.ServiceAccountKeyFile) > 0 {
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
CoreClient: kubeClient.Core(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
@ -206,20 +220,50 @@ func Run(s *options.CMServer) error {
panic("unreachable")
}
func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error {
client := func(userAgent string) clientset.Interface {
return clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, userAgent))
func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder) error {
client := func(serviceAccountName string) clientset.Interface {
return rootClientBuilder.ClientOrDie(serviceAccountName)
}
discoveryClient := client("controller-discovery").Discovery()
sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), ResyncPeriod(s)())
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
if len(s.ServiceAccountKeyFile) > 0 {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
if err != nil {
return fmt.Errorf("Error reading key for service account token controller: %v", err)
} else {
var rootCA []byte
if s.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(s.RootCAFile)
if err != nil {
return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
}
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
}
} else {
rootCA = kubeconfig.CAData
}
go serviceaccountcontroller.NewTokensController(
rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
}
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")).
Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go replicationcontroller.NewReplicationManager(
sharedInformers.Pods().Informer(),
client("replication-controller"),
clientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
int(s.LookupCacheSizeForRC),
@ -487,36 +531,6 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <
}
}
var rootCA []byte
if s.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(s.RootCAFile)
if err != nil {
return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
}
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
}
} else {
rootCA = kubeconfig.CAData
}
if len(s.ServiceAccountKeyFile) > 0 {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
if err != nil {
glog.Errorf("Error reading key for service account token controller: %v", err)
} else {
go serviceaccountcontroller.NewTokensController(
client("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
}
serviceaccountcontroller.NewServiceAccountsController(
client("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),

View File

@ -333,3 +333,25 @@ func AddUserAgent(config *Config, userAgent string) *Config {
config.UserAgent = fullUserAgent
return config
}
// AnonymousClientConfig returns a copy of the given config with all user credentials (cert/key, bearer token, and username/password) removed
func AnonymousClientConfig(config *Config) *Config {
// copy only known safe fields
return &Config{
Host: config.Host,
APIPath: config.APIPath,
Prefix: config.Prefix,
ContentConfig: config.ContentConfig,
TLSClientConfig: TLSClientConfig{
CAFile: config.TLSClientConfig.CAFile,
CAData: config.TLSClientConfig.CAData,
},
RateLimiter: config.RateLimiter,
Insecure: config.Insecure,
UserAgent: config.UserAgent,
Transport: config.Transport,
WrapTransport: config.WrapTransport,
QPS: config.QPS,
Burst: config.Burst,
}
}

View File

@ -17,10 +17,20 @@ limitations under the License.
package restclient
import (
"io"
"net/http"
"reflect"
"strings"
"testing"
fuzz "github.com/google/gofuzz"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
func TestIsConfigTransportTLS(t *testing.T) {
@ -97,3 +107,136 @@ func TestRESTClientRequires(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
}
type fakeLimiter struct {
FakeSaturation float64
FakeQPS float32
}
func (t *fakeLimiter) TryAccept() bool {
return true
}
func (t *fakeLimiter) Saturation() float64 {
return t.FakeSaturation
}
func (t *fakeLimiter) QPS() float32 {
return t.FakeQPS
}
func (t *fakeLimiter) Stop() {}
func (t *fakeLimiter) Accept() {}
type fakeCodec struct{}
func (c *fakeCodec) Decode([]byte, *unversioned.GroupVersionKind, runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
return nil, nil, nil
}
func (c *fakeCodec) Encode(obj runtime.Object, stream io.Writer) error {
return nil
}
type fakeRoundTripper struct{}
func (r *fakeRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return nil, nil
}
var fakeWrapperFunc = func(http.RoundTripper) http.RoundTripper {
return &fakeRoundTripper{}
}
type fakeNegotiatedSerializer struct{}
func (n *fakeNegotiatedSerializer) SupportedMediaTypes() []string {
return []string{}
}
func (n *fakeNegotiatedSerializer) SerializerForMediaType(mediaType string, params map[string]string) (s runtime.SerializerInfo, ok bool) {
return runtime.SerializerInfo{}, true
}
func (n *fakeNegotiatedSerializer) SupportedStreamingMediaTypes() []string {
return []string{}
}
func (n *fakeNegotiatedSerializer) StreamingSerializerForMediaType(mediaType string, params map[string]string) (s runtime.StreamSerializerInfo, ok bool) {
return runtime.StreamSerializerInfo{}, true
}
func (n *fakeNegotiatedSerializer) EncoderForVersion(serializer runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return &fakeCodec{}
}
func (n *fakeNegotiatedSerializer) DecoderToVersion(serializer runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return &fakeCodec{}
}
func TestAnonymousConfig(t *testing.T) {
f := fuzz.New().NilChance(0.0).NumElements(1, 1)
f.Funcs(
func(r *runtime.Codec, f fuzz.Continue) {
codec := &fakeCodec{}
f.Fuzz(codec)
*r = codec
},
func(r *http.RoundTripper, f fuzz.Continue) {
roundTripper := &fakeRoundTripper{}
f.Fuzz(roundTripper)
*r = roundTripper
},
func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) {
*fn = fakeWrapperFunc
},
func(r *runtime.NegotiatedSerializer, f fuzz.Continue) {
serializer := &fakeNegotiatedSerializer{}
f.Fuzz(serializer)
*r = serializer
},
func(r *flowcontrol.RateLimiter, f fuzz.Continue) {
limiter := &fakeLimiter{}
f.Fuzz(limiter)
*r = limiter
},
// Authentication does not require fuzzer
func(r *AuthProviderConfigPersister, f fuzz.Continue) {},
func(r *api.AuthProviderConfig, f fuzz.Continue) {
r.Config = map[string]string{}
},
)
for i := 0; i < 20; i++ {
original := &Config{}
f.Fuzz(original)
actual := AnonymousClientConfig(original)
expected := *original
// this is the list of known security related fields, add to this list if a new field
// is added to Config, update AnonymousClientConfig to preserve the field otherwise.
expected.Impersonate = ""
expected.BearerToken = ""
expected.Username = ""
expected.Password = ""
expected.AuthProvider = nil
expected.AuthConfigPersister = nil
expected.TLSClientConfig.CertData = nil
expected.TLSClientConfig.CertFile = ""
expected.TLSClientConfig.KeyData = nil
expected.TLSClientConfig.KeyFile = ""
// The DeepEqual cannot handle the func comparison, so we just verify if the
// function return the expected object.
if actual.WrapTransport == nil || !reflect.DeepEqual(expected.WrapTransport(nil), &fakeRoundTripper{}) {
t.Fatalf("AnonymousClientConfig dropped the WrapTransport field")
} else {
actual.WrapTransport = nil
expected.WrapTransport = nil
}
if !reflect.DeepEqual(*actual, expected) {
t.Fatalf("AnonymousClientConfig dropped unexpected fields, identify whether they are security related or not: %s", diff.ObjectGoPrintDiff(expected, actual))
}
}
}

View File

@ -0,0 +1,166 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// ControllerClientBuilder allow syou to get clients and configs for controllers
type ControllerClientBuilder interface {
Config(name string) (*restclient.Config, error)
Client(name string) (clientset.Interface, error)
ClientOrDie(name string) clientset.Interface
}
// SimpleControllerClientBuilder returns a fixed client with different user agents
type SimpleControllerClientBuilder struct {
// ClientConfig is a skeleton config to clone and use as the basis for each controller client
ClientConfig *restclient.Config
}
func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) {
clientConfig := *b.ClientConfig
return &clientConfig, nil
}
func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, error) {
clientConfig, err := b.Config(name)
if err != nil {
return nil, err
}
return clientset.NewForConfig(restclient.AddUserAgent(clientConfig, name))
}
func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface {
client, err := b.Client(name)
if err != nil {
glog.Fatal(err)
}
return client
}
// SAControllerClientBuilder is a ControllerClientBuilder that returns clients identifying as
// service accounts
type SAControllerClientBuilder struct {
// ClientConfig is a skeleton config to clone and use as the basis for each controller client
ClientConfig *restclient.Config
// CoreClient is used to provision service accounts if needed and watch for their associated tokens
// to construct a controller client
CoreClient unversionedcore.CoreInterface
// Namespace is the namespace used to host the service accounts that will back the
// controllers. It must be highly privileged namespace which normal users cannot inspect.
Namespace string
}
// config returns a complete clientConfig for constructing clients. This is separate in anticipation of composition
// which means that not all clientsets are known here
func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, error) {
clientConfig := restclient.AnonymousClientConfig(b.ClientConfig)
// we need the SA UID to find a matching SA token
sa, err := b.CoreClient.ServiceAccounts(b.Namespace).Get(name)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
} else if apierrors.IsNotFound(err) {
// check to see if the namespace exists. If it isn't a NotFound, just try to create the SA.
// It'll probably fail, but perhaps that will have a better message.
if _, err := b.CoreClient.Namespaces().Get(b.Namespace); apierrors.IsNotFound(err) {
_, err = b.CoreClient.Namespaces().Create(&api.Namespace{ObjectMeta: api.ObjectMeta{Name: b.Namespace}})
if err != nil && !apierrors.IsAlreadyExists(err) {
return nil, err
}
}
sa, err = b.CoreClient.ServiceAccounts(b.Namespace).Create(
&api.ServiceAccount{ObjectMeta: api.ObjectMeta{Namespace: b.Namespace, Name: name}})
if err != nil {
return nil, err
}
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(api.SecretTypeServiceAccountToken)})
return b.CoreClient.Secrets(b.Namespace).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(api.SecretTypeServiceAccountToken)})
return b.CoreClient.Secrets(b.Namespace).Watch(options)
},
}
_, err = watch.ListWatchUntil(30*time.Second, lw,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, nil
case watch.Error:
return false, fmt.Errorf("error watching")
case watch.Added, watch.Modified:
secret := event.Object.(*api.Secret)
if !serviceaccount.IsServiceAccountToken(secret, sa) ||
len(secret.Data[api.ServiceAccountTokenKey]) == 0 {
return false, nil
}
// TODO maybe verify the token is valid
clientConfig.BearerToken = string(secret.Data[api.ServiceAccountTokenKey])
restclient.AddUserAgent(clientConfig, serviceaccount.MakeUsername(b.Namespace, name))
return true, nil
default:
return false, fmt.Errorf("unexpected event type: %v", event.Type)
}
})
if err != nil {
return nil, fmt.Errorf("unable to get token for service account: %v", err)
}
return clientConfig, nil
}
func (b SAControllerClientBuilder) Client(name string) (clientset.Interface, error) {
clientConfig, err := b.Config(name)
if err != nil {
return nil, err
}
return clientset.NewForConfig(clientConfig)
}
func (b SAControllerClientBuilder) ClientOrDie(name string) clientset.Interface {
client, err := b.Client(name)
if err != nil {
glog.Fatal(err)
}
return client
}

View File

@ -19,6 +19,9 @@ package watch
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -53,7 +56,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
return lastEvent, err
}
if done {
break
continue
}
}
ConditionSucceeded:
@ -81,3 +84,78 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
}
return lastEvent, nil
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (Interface, error)
}
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...ConditionFunc) (*Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &Event{Type: Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watch, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
return Until(timeout, watch, remainingConditions...)
}

View File

@ -23,6 +23,8 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestUntil(t *testing.T) {
@ -80,6 +82,34 @@ func TestUntilMultipleConditions(t *testing.T) {
}
}
func TestUntilMultipleConditionsFail(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Deleted, nil },
}
timeout := 10 * time.Second
lastEvent, err := Until(timeout, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Added {
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilTimeout(t *testing.T) {
fw := NewFake()
go func() {
@ -133,3 +163,54 @@ func TestUntilErrorCondition(t *testing.T) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}
type lw struct {
list runtime.Object
watch Interface
}
func (w lw) List(options api.ListOptions) (runtime.Object, error) {
return w.list, nil
}
func (w lw) Watch(options api.ListOptions) (Interface, error) {
return w.watch, nil
}
func TestListWatchUntil(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &api.PodList{Items: []api.Pod{{}}},
watch: fw,
}
conditions := []ConditionFunc{
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Added, nil
},
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Modified, nil
},
}
timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}