mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
test/integration/client: add TestExecPluginRotationViaInformer
Signed-off-by: Andrew Keesler <akeesler@vmware.com>
This commit is contained in:
parent
9126048c9c
commit
a14cd8e3de
@ -21,16 +21,18 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -39,9 +41,11 @@ import (
|
|||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||||
|
"k8s.io/client-go/transport"
|
||||||
"k8s.io/client-go/util/cert"
|
"k8s.io/client-go/util/cert"
|
||||||
|
"k8s.io/client-go/util/connrotation"
|
||||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
)
|
)
|
||||||
@ -50,7 +54,8 @@ import (
|
|||||||
|
|
||||||
// These constants are used to communicate behavior to the testdata/exec-plugin.sh test fixture.
|
// These constants are used to communicate behavior to the testdata/exec-plugin.sh test fixture.
|
||||||
const (
|
const (
|
||||||
outputEnvVar = "EXEC_PLUGIN_OUTPUT"
|
outputEnvVar = "EXEC_PLUGIN_OUTPUT"
|
||||||
|
outputFileEnvVar = "EXEC_PLUGIN_OUTPUT_FILE"
|
||||||
)
|
)
|
||||||
|
|
||||||
type roundTripperFunc func(*http.Request) (*http.Response, error)
|
type roundTripperFunc func(*http.Request) (*http.Response, error)
|
||||||
@ -412,32 +417,53 @@ type informerSpy struct {
|
|||||||
deletes []interface{}
|
deletes []interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *informerSpy) OnAdd(obj interface{}) {
|
func (is *informerSpy) OnAdd(obj interface{}) {
|
||||||
es.mu.Lock()
|
is.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer is.mu.Unlock()
|
||||||
es.adds = append(es.adds, obj)
|
is.adds = append(is.adds, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *informerSpy) OnUpdate(old, new interface{}) {
|
func (is *informerSpy) OnUpdate(old, new interface{}) {
|
||||||
es.mu.Lock()
|
is.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer is.mu.Unlock()
|
||||||
es.updates = append(es.updates, oldNew{old: old, new: new})
|
is.updates = append(is.updates, oldNew{old: old, new: new})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *informerSpy) OnDelete(obj interface{}) {
|
func (is *informerSpy) OnDelete(obj interface{}) {
|
||||||
es.mu.Lock()
|
is.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer is.mu.Unlock()
|
||||||
es.deletes = append(es.deletes, obj)
|
is.deletes = append(is.deletes, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForEvents waits for adds, updates, and deletes to be filled with at least one event.
|
func (is *informerSpy) clear() {
|
||||||
func (es *informerSpy) waitForEvents(t *testing.T) {
|
is.mu.Lock()
|
||||||
if err := wait.PollImmediate(time.Millisecond*250, time.Second*20, func() (bool, error) {
|
defer is.mu.Unlock()
|
||||||
es.mu.Lock()
|
is.adds = []interface{}{}
|
||||||
defer es.mu.Unlock()
|
is.updates = []oldNew{}
|
||||||
return len(es.adds) > 0 && len(es.updates) > 0 && len(es.deletes) > 0, nil
|
is.deletes = []interface{}{}
|
||||||
}); err != nil {
|
}
|
||||||
t.Fatalf("failed to wait for events: %v", err)
|
|
||||||
|
// waitForEvents waits for adds, updates, and deletes to be populated with at least one event.
|
||||||
|
func (is *informerSpy) waitForEvents(t *testing.T, wantEvents bool) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
err := wait.PollImmediate(time.Second, time.Second*30, func() (bool, error) {
|
||||||
|
is.mu.Lock()
|
||||||
|
defer is.mu.Unlock()
|
||||||
|
return len(is.adds) > 0 && len(is.updates) > 0 && len(is.deletes) > 0, nil
|
||||||
|
})
|
||||||
|
if wantEvents {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("wanted events, but got error: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !errors.Is(err, wait.ErrWaitTimeout) {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("wanted no events, but got error: %v", err)
|
||||||
|
} else {
|
||||||
|
t.Fatalf("wanted no events, but got some: %s", spew.Sprintf("%#v", is))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -504,26 +530,128 @@ func TestExecPluginViaInformer(t *testing.T) {
|
|||||||
if test.clientConfigFunc != nil {
|
if test.clientConfigFunc != nil {
|
||||||
test.clientConfigFunc(clientConfig)
|
test.clientConfigFunc(clientConfig)
|
||||||
}
|
}
|
||||||
client := clientset.NewForConfigOrDie(clientConfig)
|
|
||||||
|
|
||||||
informerSpy := startConfigMapInformer(ctx, t, client, ns.Name)
|
informer, informerSpy := startConfigMapInformer(ctx, t, clientset.NewForConfigOrDie(clientConfig), ns.Name)
|
||||||
createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, client.CoreV1().ConfigMaps(ns.Name))
|
waitForInformerSync(ctx, t, informer, true, "")
|
||||||
informerSpy.waitForEvents(t)
|
createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
|
||||||
|
informerSpy.waitForEvents(t, true)
|
||||||
// Validate that the informer was called correctly.
|
assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM)
|
||||||
if diff := cmp.Diff([]interface{}{createdCM}, informerSpy.adds, objectMetaSansResourceVersionComparer); diff != "" {
|
|
||||||
t.Errorf("unexpected add event(s), -want, +got:\n%s", diff)
|
|
||||||
}
|
|
||||||
if diff := cmp.Diff([]oldNew{{createdCM, updatedCM}}, informerSpy.updates, oldNewComparer); diff != "" {
|
|
||||||
t.Errorf("unexpected update event(s), -want, +got:\n%s", diff)
|
|
||||||
}
|
|
||||||
if diff := cmp.Diff([]interface{}{deletedCM}, informerSpy.deletes, objectMetaSansResourceVersionComparer); diff != "" {
|
|
||||||
t.Errorf("unexpected deleted event(s), -want, +got:\n%s", diff)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type execPlugin struct {
|
||||||
|
t *testing.T
|
||||||
|
outputFile *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
func newExecPlugin(t *testing.T) *execPlugin {
|
||||||
|
t.Helper()
|
||||||
|
outputFile, err := ioutil.TempFile("", "kubernetes-client-exec-test-plugin-output-file-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return &execPlugin{t: t, outputFile: outputFile}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *execPlugin) config() *clientcmdapi.ExecConfig {
|
||||||
|
return &clientcmdapi.ExecConfig{
|
||||||
|
Command: "testdata/exec-plugin.sh",
|
||||||
|
// TODO(ankeesler): move to v1 once exec plugins go GA.
|
||||||
|
APIVersion: "client.authentication.k8s.io/v1beta1",
|
||||||
|
Env: []clientcmdapi.ExecEnvVar{
|
||||||
|
{
|
||||||
|
Name: outputFileEnvVar,
|
||||||
|
Value: e.outputFile.Name(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *execPlugin) rotateToken(newToken string, lifetime time.Duration) {
|
||||||
|
e.t.Helper()
|
||||||
|
|
||||||
|
expirationTimestamp := metav1.NewTime(time.Now().Add(lifetime)).Format(time.RFC3339Nano)
|
||||||
|
newOutput := fmt.Sprintf(`{
|
||||||
|
"kind": "ExecCredential",
|
||||||
|
"apiVersion": "client.authentication.k8s.io/v1beta1",
|
||||||
|
"status": {
|
||||||
|
"expirationTimestamp": %q,
|
||||||
|
"token": %q
|
||||||
|
}
|
||||||
|
}`, expirationTimestamp, newToken)
|
||||||
|
if err := os.WriteFile(e.outputFile.Name(), []byte(newOutput), 0644); err != nil {
|
||||||
|
e.t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecPluginRotationViaInformer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
result, clientAuthorizedToken, _, _ := startTestServer(t)
|
||||||
|
const clientUnauthorizedToken = "invalid-token"
|
||||||
|
const tokenLifetime = time.Second * 5
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
adminClient := clientset.NewForConfigOrDie(result.ClientConfig)
|
||||||
|
ns := createNamespace(ctx, t, adminClient)
|
||||||
|
|
||||||
|
clientDialer := connrotation.NewDialer((&net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext)
|
||||||
|
|
||||||
|
execPlugin := newExecPlugin(t)
|
||||||
|
|
||||||
|
clientConfig := rest.AnonymousClientConfig(result.ClientConfig)
|
||||||
|
clientConfig.ExecProvider = execPlugin.config()
|
||||||
|
clientConfig.Dial = clientDialer.DialContext
|
||||||
|
clientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||||
|
// This makes it helpful to see what is happening with the informer's client.
|
||||||
|
return transport.NewDebuggingRoundTripper(rt, transport.DebugCurlCommand, transport.DebugURLTiming)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Initialize informer spy wth invalid token.
|
||||||
|
// Make sure informer never syncs because it can't authenticate.
|
||||||
|
execPlugin.rotateToken(clientUnauthorizedToken, tokenLifetime)
|
||||||
|
informer, informerSpy := startConfigMapInformer(ctx, t, clientset.NewForConfigOrDie(clientConfig), ns.Name)
|
||||||
|
waitForInformerSync(ctx, t, informer, false, "")
|
||||||
|
createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
|
||||||
|
informerSpy.waitForEvents(t, false)
|
||||||
|
|
||||||
|
// Rotate token to valid token.
|
||||||
|
// Make sure informer sees events because it now has a valid token with which it can authenticate.
|
||||||
|
execPlugin.rotateToken(clientAuthorizedToken, tokenLifetime)
|
||||||
|
waitForInformerSync(ctx, t, informer, true, "")
|
||||||
|
informerSpy.clear()
|
||||||
|
createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
|
||||||
|
informerSpy.waitForEvents(t, true)
|
||||||
|
assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM)
|
||||||
|
|
||||||
|
// Rotate token to something invalid and clip watch connection.
|
||||||
|
// Informer should recreate connection with invalid token.
|
||||||
|
// Make sure informer does not see events since it is using the invalid token.
|
||||||
|
execPlugin.rotateToken(clientUnauthorizedToken, tokenLifetime)
|
||||||
|
time.Sleep(tokenLifetime) // wait for old token to expire to make sure the watch is restarted with clientUnauthorizedToken
|
||||||
|
clientDialer.CloseAll()
|
||||||
|
waitForInformerSync(ctx, t, informer, true, "")
|
||||||
|
informerSpy.clear()
|
||||||
|
createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
|
||||||
|
informerSpy.waitForEvents(t, false)
|
||||||
|
|
||||||
|
// Rotate token to valid token.
|
||||||
|
// Make sure informer sees events because it now has a valid token with which it can authenticate.
|
||||||
|
lastSyncResourceVersion := informer.LastSyncResourceVersion()
|
||||||
|
execPlugin.rotateToken(clientAuthorizedToken, tokenLifetime)
|
||||||
|
waitForInformerSync(ctx, t, informer, true, lastSyncResourceVersion)
|
||||||
|
informerSpy.clear()
|
||||||
|
createdCM, updatedCM, deletedCM = createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
|
||||||
|
informerSpy.waitForEvents(t, true)
|
||||||
|
assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM)
|
||||||
|
}
|
||||||
|
|
||||||
func startTestServer(t *testing.T) (result *kubeapiservertesting.TestServer, clientAuthorizedToken string, clientCertFileName string, clientKeyFileName string) {
|
func startTestServer(t *testing.T) (result *kubeapiservertesting.TestServer, clientAuthorizedToken string, clientCertFileName string, clientKeyFileName string) {
|
||||||
certDir, err := ioutil.TempDir("", "kubernetes-client-exec-test-cert-dir-*")
|
certDir, err := ioutil.TempDir("", "kubernetes-client-exec-test-cert-dir-*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -634,22 +762,39 @@ func createNamespace(ctx context.Context, t *testing.T, client clientset.Interfa
|
|||||||
return ns
|
return ns
|
||||||
}
|
}
|
||||||
|
|
||||||
func startConfigMapInformer(ctx context.Context, t *testing.T, client clientset.Interface, namespace string) *informerSpy {
|
func startConfigMapInformer(ctx context.Context, t *testing.T, client clientset.Interface, namespace string) (cache.SharedIndexInformer, *informerSpy) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
var informerSpy informerSpy
|
var informerSpy informerSpy
|
||||||
informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(namespace))
|
informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(namespace))
|
||||||
informerFactory.Core().V1().ConfigMaps().Informer().AddEventHandler(&informerSpy)
|
cmInformer := informerFactory.Core().V1().ConfigMaps().Informer()
|
||||||
informerFactory.Start(ctx.Done())
|
cmInformer.AddEventHandler(&informerSpy)
|
||||||
synced := informerFactory.WaitForCacheSync(ctx.Done())
|
if err := cmInformer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
||||||
if len(synced) != 1 {
|
// t.Logf("watch error handler: failure in reflector %#v: %v", r, err) // Uncomment for more verbose logging
|
||||||
t.Fatalf("expected only 1 synced type, got %v", synced)
|
}); err != nil {
|
||||||
|
t.Fatalf("could not set watch error handler: %v", err)
|
||||||
}
|
}
|
||||||
if cmSynced, ok := synced[reflect.TypeOf(&corev1.ConfigMap{})]; !(cmSynced && ok) {
|
informerFactory.Start(ctx.Done())
|
||||||
t.Fatalf("expected ConfigMaps to be synced, got %v", synced)
|
|
||||||
|
return cmInformer, &informerSpy
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForInformerSync(ctx context.Context, t *testing.T, informer cache.SharedIndexInformer, wantSynced bool, lastSyncResourceVersion string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
syncCtx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||||
|
defer cancel()
|
||||||
|
if gotSynced := cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced); wantSynced != gotSynced {
|
||||||
|
t.Fatalf("wanted sync %t, got sync %t", wantSynced, gotSynced)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &informerSpy
|
if len(lastSyncResourceVersion) != 0 {
|
||||||
|
if err := wait.PollImmediate(time.Second, time.Second*30, func() (bool, error) {
|
||||||
|
return informer.LastSyncResourceVersion() != lastSyncResourceVersion, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("informer never changed resource versions from %q: %v", lastSyncResourceVersion, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.ConfigMapInterface) (created, updated, deleted *corev1.ConfigMap) {
|
func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.ConfigMapInterface) (created, updated, deleted *corev1.ConfigMap) {
|
||||||
@ -658,21 +803,37 @@ func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.Confi
|
|||||||
var err error
|
var err error
|
||||||
created, err = cms.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "cm"}}, metav1.CreateOptions{})
|
created, err = cms.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "cm"}}, metav1.CreateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal("could not create ConfigMap:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
updated = created.DeepCopy()
|
updated = created.DeepCopy()
|
||||||
updated.Annotations = map[string]string{"tuna": "fish"}
|
updated.Annotations = map[string]string{"tuna": "fish"}
|
||||||
updated, err = cms.Update(ctx, updated, metav1.UpdateOptions{})
|
updated, err = cms.Update(ctx, updated, metav1.UpdateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal("could not update ConfigMap:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cms.Delete(ctx, updated.Name, metav1.DeleteOptions{}); err != nil {
|
if err := cms.Delete(ctx, updated.Name, metav1.DeleteOptions{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal("could not delete ConfigMap:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
deleted = updated.DeepCopy()
|
deleted = updated.DeepCopy()
|
||||||
|
|
||||||
return created, updated, deleted
|
return created, updated, deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assertInformerEvents(t *testing.T, informerSpy *informerSpy, created, updated, deleted interface{}) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
// Validate that the informer was called correctly.
|
||||||
|
if diff := cmp.Diff([]interface{}{created}, informerSpy.adds, objectMetaSansResourceVersionComparer); diff != "" {
|
||||||
|
t.Errorf("unexpected add event(s), -want, +got:\n%s", diff)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff([]oldNew{{created, updated}}, informerSpy.updates, oldNewComparer); diff != "" {
|
||||||
|
t.Errorf("unexpected update event(s), -want, +got:\n%s", diff)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff([]interface{}{deleted}, informerSpy.deletes, objectMetaSansResourceVersionComparer); diff != "" {
|
||||||
|
t.Errorf("unexpected deleted event(s), -want, +got:\n%s", diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -18,4 +18,9 @@ set -o errexit
|
|||||||
set -o nounset
|
set -o nounset
|
||||||
set -o pipefail
|
set -o pipefail
|
||||||
|
|
||||||
|
if [[ -n "${EXEC_PLUGIN_OUTPUT_FILE-""}" ]]; then
|
||||||
|
cat "$EXEC_PLUGIN_OUTPUT_FILE"
|
||||||
|
exit
|
||||||
|
fi
|
||||||
|
|
||||||
echo "$EXEC_PLUGIN_OUTPUT"
|
echo "$EXEC_PLUGIN_OUTPUT"
|
||||||
|
Loading…
Reference in New Issue
Block a user