From a14cd8e3de619e81375cf5418a392a05093aaef2 Mon Sep 17 00:00:00 2001 From: Andrew Keesler Date: Tue, 4 May 2021 09:58:23 -0400 Subject: [PATCH] test/integration/client: add TestExecPluginRotationViaInformer Signed-off-by: Andrew Keesler --- test/integration/client/exec_test.go | 261 ++++++++++++++---- .../client/testdata/exec-plugin.sh | 5 + 2 files changed, 216 insertions(+), 50 deletions(-) diff --git a/test/integration/client/exec_test.go b/test/integration/client/exec_test.go index c14eca0b211..fcaae20748f 100644 --- a/test/integration/client/exec_test.go +++ b/test/integration/client/exec_test.go @@ -21,16 +21,18 @@ import ( "crypto/tls" "crypto/x509" "encoding/base64" + "errors" "fmt" "io/ioutil" + "net" "net/http" "os" - "reflect" "strings" "sync" "testing" "time" + "github.com/davecgh/go-spew/spew" "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,9 +41,11 @@ import ( clientset "k8s.io/client-go/kubernetes" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" "k8s.io/client-go/util/cert" - + "k8s.io/client-go/util/connrotation" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -50,7 +54,8 @@ import ( // These constants are used to communicate behavior to the testdata/exec-plugin.sh test fixture. const ( - outputEnvVar = "EXEC_PLUGIN_OUTPUT" + outputEnvVar = "EXEC_PLUGIN_OUTPUT" + outputFileEnvVar = "EXEC_PLUGIN_OUTPUT_FILE" ) type roundTripperFunc func(*http.Request) (*http.Response, error) @@ -412,32 +417,53 @@ type informerSpy struct { deletes []interface{} } -func (es *informerSpy) OnAdd(obj interface{}) { - es.mu.Lock() - defer es.mu.Unlock() - es.adds = append(es.adds, obj) +func (is *informerSpy) OnAdd(obj interface{}) { + is.mu.Lock() + defer is.mu.Unlock() + is.adds = append(is.adds, obj) } -func (es *informerSpy) OnUpdate(old, new interface{}) { - es.mu.Lock() - defer es.mu.Unlock() - es.updates = append(es.updates, oldNew{old: old, new: new}) +func (is *informerSpy) OnUpdate(old, new interface{}) { + is.mu.Lock() + defer is.mu.Unlock() + is.updates = append(is.updates, oldNew{old: old, new: new}) } -func (es *informerSpy) OnDelete(obj interface{}) { - es.mu.Lock() - defer es.mu.Unlock() - es.deletes = append(es.deletes, obj) +func (is *informerSpy) OnDelete(obj interface{}) { + is.mu.Lock() + defer is.mu.Unlock() + is.deletes = append(is.deletes, obj) } -// waitForEvents waits for adds, updates, and deletes to be filled with at least one event. -func (es *informerSpy) waitForEvents(t *testing.T) { - if err := wait.PollImmediate(time.Millisecond*250, time.Second*20, func() (bool, error) { - es.mu.Lock() - defer es.mu.Unlock() - return len(es.adds) > 0 && len(es.updates) > 0 && len(es.deletes) > 0, nil - }); err != nil { - t.Fatalf("failed to wait for events: %v", err) +func (is *informerSpy) clear() { + is.mu.Lock() + defer is.mu.Unlock() + is.adds = []interface{}{} + is.updates = []oldNew{} + is.deletes = []interface{}{} +} + +// waitForEvents waits for adds, updates, and deletes to be populated with at least one event. +func (is *informerSpy) waitForEvents(t *testing.T, wantEvents bool) { + t.Helper() + + err := wait.PollImmediate(time.Second, time.Second*30, func() (bool, error) { + is.mu.Lock() + defer is.mu.Unlock() + return len(is.adds) > 0 && len(is.updates) > 0 && len(is.deletes) > 0, nil + }) + if wantEvents { + if err != nil { + t.Fatalf("wanted events, but got error: %v", err) + } + } else { + if !errors.Is(err, wait.ErrWaitTimeout) { + if err != nil { + t.Fatalf("wanted no events, but got error: %v", err) + } else { + t.Fatalf("wanted no events, but got some: %s", spew.Sprintf("%#v", is)) + } + } } } @@ -504,26 +530,128 @@ func TestExecPluginViaInformer(t *testing.T) { if test.clientConfigFunc != nil { test.clientConfigFunc(clientConfig) } - client := clientset.NewForConfigOrDie(clientConfig) - informerSpy := startConfigMapInformer(ctx, t, client, ns.Name) - createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, client.CoreV1().ConfigMaps(ns.Name)) - informerSpy.waitForEvents(t) - - // Validate that the informer was called correctly. - if diff := cmp.Diff([]interface{}{createdCM}, informerSpy.adds, objectMetaSansResourceVersionComparer); diff != "" { - t.Errorf("unexpected add event(s), -want, +got:\n%s", diff) - } - if diff := cmp.Diff([]oldNew{{createdCM, updatedCM}}, informerSpy.updates, oldNewComparer); diff != "" { - t.Errorf("unexpected update event(s), -want, +got:\n%s", diff) - } - if diff := cmp.Diff([]interface{}{deletedCM}, informerSpy.deletes, objectMetaSansResourceVersionComparer); diff != "" { - t.Errorf("unexpected deleted event(s), -want, +got:\n%s", diff) - } + informer, informerSpy := startConfigMapInformer(ctx, t, clientset.NewForConfigOrDie(clientConfig), ns.Name) + waitForInformerSync(ctx, t, informer, true, "") + createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name)) + informerSpy.waitForEvents(t, true) + assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM) }) } } +type execPlugin struct { + t *testing.T + outputFile *os.File +} + +func newExecPlugin(t *testing.T) *execPlugin { + t.Helper() + outputFile, err := ioutil.TempFile("", "kubernetes-client-exec-test-plugin-output-file-*") + if err != nil { + t.Fatal(err) + } + return &execPlugin{t: t, outputFile: outputFile} +} + +func (e *execPlugin) config() *clientcmdapi.ExecConfig { + return &clientcmdapi.ExecConfig{ + Command: "testdata/exec-plugin.sh", + // TODO(ankeesler): move to v1 once exec plugins go GA. + APIVersion: "client.authentication.k8s.io/v1beta1", + Env: []clientcmdapi.ExecEnvVar{ + { + Name: outputFileEnvVar, + Value: e.outputFile.Name(), + }, + }, + } +} + +func (e *execPlugin) rotateToken(newToken string, lifetime time.Duration) { + e.t.Helper() + + expirationTimestamp := metav1.NewTime(time.Now().Add(lifetime)).Format(time.RFC3339Nano) + newOutput := fmt.Sprintf(`{ + "kind": "ExecCredential", + "apiVersion": "client.authentication.k8s.io/v1beta1", + "status": { + "expirationTimestamp": %q, + "token": %q + } + }`, expirationTimestamp, newToken) + if err := os.WriteFile(e.outputFile.Name(), []byte(newOutput), 0644); err != nil { + e.t.Fatal(err) + } +} + +func TestExecPluginRotationViaInformer(t *testing.T) { + t.Parallel() + + result, clientAuthorizedToken, _, _ := startTestServer(t) + const clientUnauthorizedToken = "invalid-token" + const tokenLifetime = time.Second * 5 + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + + adminClient := clientset.NewForConfigOrDie(result.ClientConfig) + ns := createNamespace(ctx, t, adminClient) + + clientDialer := connrotation.NewDialer((&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext) + + execPlugin := newExecPlugin(t) + + clientConfig := rest.AnonymousClientConfig(result.ClientConfig) + clientConfig.ExecProvider = execPlugin.config() + clientConfig.Dial = clientDialer.DialContext + clientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { + // This makes it helpful to see what is happening with the informer's client. + return transport.NewDebuggingRoundTripper(rt, transport.DebugCurlCommand, transport.DebugURLTiming) + }) + + // Initialize informer spy wth invalid token. + // Make sure informer never syncs because it can't authenticate. + execPlugin.rotateToken(clientUnauthorizedToken, tokenLifetime) + informer, informerSpy := startConfigMapInformer(ctx, t, clientset.NewForConfigOrDie(clientConfig), ns.Name) + waitForInformerSync(ctx, t, informer, false, "") + createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name)) + informerSpy.waitForEvents(t, false) + + // Rotate token to valid token. + // Make sure informer sees events because it now has a valid token with which it can authenticate. + execPlugin.rotateToken(clientAuthorizedToken, tokenLifetime) + waitForInformerSync(ctx, t, informer, true, "") + informerSpy.clear() + createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name)) + informerSpy.waitForEvents(t, true) + assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM) + + // Rotate token to something invalid and clip watch connection. + // Informer should recreate connection with invalid token. + // Make sure informer does not see events since it is using the invalid token. + execPlugin.rotateToken(clientUnauthorizedToken, tokenLifetime) + time.Sleep(tokenLifetime) // wait for old token to expire to make sure the watch is restarted with clientUnauthorizedToken + clientDialer.CloseAll() + waitForInformerSync(ctx, t, informer, true, "") + informerSpy.clear() + createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name)) + informerSpy.waitForEvents(t, false) + + // Rotate token to valid token. + // Make sure informer sees events because it now has a valid token with which it can authenticate. + lastSyncResourceVersion := informer.LastSyncResourceVersion() + execPlugin.rotateToken(clientAuthorizedToken, tokenLifetime) + waitForInformerSync(ctx, t, informer, true, lastSyncResourceVersion) + informerSpy.clear() + createdCM, updatedCM, deletedCM = createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name)) + informerSpy.waitForEvents(t, true) + assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM) +} + func startTestServer(t *testing.T) (result *kubeapiservertesting.TestServer, clientAuthorizedToken string, clientCertFileName string, clientKeyFileName string) { certDir, err := ioutil.TempDir("", "kubernetes-client-exec-test-cert-dir-*") if err != nil { @@ -634,22 +762,39 @@ func createNamespace(ctx context.Context, t *testing.T, client clientset.Interfa return ns } -func startConfigMapInformer(ctx context.Context, t *testing.T, client clientset.Interface, namespace string) *informerSpy { +func startConfigMapInformer(ctx context.Context, t *testing.T, client clientset.Interface, namespace string) (cache.SharedIndexInformer, *informerSpy) { t.Helper() var informerSpy informerSpy informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(namespace)) - informerFactory.Core().V1().ConfigMaps().Informer().AddEventHandler(&informerSpy) - informerFactory.Start(ctx.Done()) - synced := informerFactory.WaitForCacheSync(ctx.Done()) - if len(synced) != 1 { - t.Fatalf("expected only 1 synced type, got %v", synced) + cmInformer := informerFactory.Core().V1().ConfigMaps().Informer() + cmInformer.AddEventHandler(&informerSpy) + if err := cmInformer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + // t.Logf("watch error handler: failure in reflector %#v: %v", r, err) // Uncomment for more verbose logging + }); err != nil { + t.Fatalf("could not set watch error handler: %v", err) } - if cmSynced, ok := synced[reflect.TypeOf(&corev1.ConfigMap{})]; !(cmSynced && ok) { - t.Fatalf("expected ConfigMaps to be synced, got %v", synced) + informerFactory.Start(ctx.Done()) + + return cmInformer, &informerSpy +} + +func waitForInformerSync(ctx context.Context, t *testing.T, informer cache.SharedIndexInformer, wantSynced bool, lastSyncResourceVersion string) { + t.Helper() + + syncCtx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + if gotSynced := cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced); wantSynced != gotSynced { + t.Fatalf("wanted sync %t, got sync %t", wantSynced, gotSynced) } - return &informerSpy + if len(lastSyncResourceVersion) != 0 { + if err := wait.PollImmediate(time.Second, time.Second*30, func() (bool, error) { + return informer.LastSyncResourceVersion() != lastSyncResourceVersion, nil + }); err != nil { + t.Fatalf("informer never changed resource versions from %q: %v", lastSyncResourceVersion, err) + } + } } func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.ConfigMapInterface) (created, updated, deleted *corev1.ConfigMap) { @@ -658,21 +803,37 @@ func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.Confi var err error created, err = cms.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "cm"}}, metav1.CreateOptions{}) if err != nil { - t.Fatal(err) + t.Fatal("could not create ConfigMap:", err) } updated = created.DeepCopy() updated.Annotations = map[string]string{"tuna": "fish"} updated, err = cms.Update(ctx, updated, metav1.UpdateOptions{}) if err != nil { - t.Fatal(err) + t.Fatal("could not update ConfigMap:", err) } if err := cms.Delete(ctx, updated.Name, metav1.DeleteOptions{}); err != nil { - t.Fatal(err) + t.Fatal("could not delete ConfigMap:", err) } deleted = updated.DeepCopy() return created, updated, deleted } + +func assertInformerEvents(t *testing.T, informerSpy *informerSpy, created, updated, deleted interface{}) { + t.Helper() + + // Validate that the informer was called correctly. + if diff := cmp.Diff([]interface{}{created}, informerSpy.adds, objectMetaSansResourceVersionComparer); diff != "" { + t.Errorf("unexpected add event(s), -want, +got:\n%s", diff) + } + if diff := cmp.Diff([]oldNew{{created, updated}}, informerSpy.updates, oldNewComparer); diff != "" { + t.Errorf("unexpected update event(s), -want, +got:\n%s", diff) + } + if diff := cmp.Diff([]interface{}{deleted}, informerSpy.deletes, objectMetaSansResourceVersionComparer); diff != "" { + t.Errorf("unexpected deleted event(s), -want, +got:\n%s", diff) + } + +} diff --git a/test/integration/client/testdata/exec-plugin.sh b/test/integration/client/testdata/exec-plugin.sh index 6ad3e6ef6ab..de20bcea27b 100755 --- a/test/integration/client/testdata/exec-plugin.sh +++ b/test/integration/client/testdata/exec-plugin.sh @@ -18,4 +18,9 @@ set -o errexit set -o nounset set -o pipefail +if [[ -n "${EXEC_PLUGIN_OUTPUT_FILE-""}" ]]; then + cat "$EXEC_PLUGIN_OUTPUT_FILE" + exit +fi + echo "$EXEC_PLUGIN_OUTPUT"