From af61e8fbdf3d83e2b287721fe0d57dd4a2234450 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 1 Sep 2020 11:00:31 +0200 Subject: [PATCH] Test watchcache being updated in multietcd setup --- test/integration/apiserver/BUILD | 2 + test/integration/apiserver/watchcache_test.go | 170 ++++++++++++++++++ test/integration/framework/etcd.go | 47 +++-- 3 files changed, 201 insertions(+), 18 deletions(-) create mode 100644 test/integration/apiserver/watchcache_test.go diff --git a/test/integration/apiserver/BUILD b/test/integration/apiserver/BUILD index c67227c5f31..a31fe9aa444 100644 --- a/test/integration/apiserver/BUILD +++ b/test/integration/apiserver/BUILD @@ -15,6 +15,7 @@ go_test( "max_request_body_bytes_test.go", "patch_test.go", "print_test.go", + "watchcache_test.go", ], rundir = ".", tags = [ @@ -25,6 +26,7 @@ go_test( "//cmd/kube-apiserver/app/options:go_default_library", "//pkg/api/legacyscheme:go_default_library", "//pkg/controlplane:go_default_library", + "//pkg/controlplane/reconcilers:go_default_library", "//pkg/features:go_default_library", "//pkg/printers:go_default_library", "//pkg/printers/internalversion:go_default_library", diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go new file mode 100644 index 00000000000..f29c8d6cd9d --- /dev/null +++ b/test/integration/apiserver/watchcache_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/controlplane/reconcilers" + "k8s.io/kubernetes/test/integration/framework" +) + +// setup create kube-apiserver backed up by two separate etcds, +// with one of them containing events and the other all other objects. +func multiEtcdSetup(t testing.TB) (clientset.Interface, framework.CloseFunc) { + etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} + etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) + if err != nil { + t.Fatalf("Couldn't start etcd: %v", err) + } + + etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs) + if err != nil { + t.Fatalf("Couldn't start etcd: %v", err) + } + + etcdOptions := framework.DefaultEtcdOptions() + // Overwrite etcd setup to our custom etcd instances. + etcdOptions.StorageConfig.Transport.ServerList = []string{etcd0URL} + etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)} + etcdOptions.EnableWatchCache = true + + opts := framework.MasterConfigOptions{EtcdOptions: etcdOptions} + masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts) + // Switch off endpoints reconciler to avoid unnecessary operations. + masterConfig.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType + _, s, stopMaster := framework.RunAMaster(masterConfig) + + closeFn := func() { + stopMaster() + stopEtcd1() + stopEtcd0() + } + + clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1}) + if err != nil { + t.Fatalf("Error in create clientset: %v", err) + } + + // Wait for apiserver to be stabilized. + // Everything but default service creation is checked in RunAMaster above by + // waiting for post start hooks, so we just wait for default service to exist. + // TODO(wojtek-t): Figure out less fragile way. + ctx := context.Background() + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientSet.CoreV1().Services("default").Get(ctx, "kubernetes", metav1.GetOptions{}) + return err == nil, nil + }); err != nil { + t.Fatalf("Failed to wait for kubernetes service: %v:", err) + } + return clientSet, closeFn +} + +func TestWatchCacheUpdatedByEtcd(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EfficientWatchResumption, true)() + + c, closeFn := multiEtcdSetup(t) + defer closeFn() + + ctx := context.Background() + + makeConfigMap := func(name string) *v1.ConfigMap { + return &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + makeSecret := func(name string) *v1.Secret { + return &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + makeEvent := func(name string) *v1.Event { + return &v1.Event{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + + cm, err := c.CoreV1().ConfigMaps("default").Create(ctx, makeConfigMap("name"), metav1.CreateOptions{}) + if err != nil { + t.Errorf("Couldn't create configmap: %v", err) + } + ev, err := c.CoreV1().Events("default").Create(ctx, makeEvent("name"), metav1.CreateOptions{}) + if err != nil { + t.Errorf("Couldn't create event: %v", err) + } + + listOptions := metav1.ListOptions{ + ResourceVersion: "0", + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + + // Wait until listing from cache returns resource version of corresponding + // resources (being the last updates). + t.Logf("Waiting for configmaps watchcache synced to %s", cm.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == cm.ResourceVersion, nil + }); err != nil { + t.Errorf("Failed to wait for configmaps watchcache synced: %v", err) + } + t.Logf("Waiting for events watchcache synced to %s", ev.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + res, err := c.CoreV1().Events("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == ev.ResourceVersion, nil + }); err != nil { + t.Errorf("Failed to wait for events watchcache synced: %v", err) + } + + // Create a secret, that is stored in the same etcd as configmap, but + // different than events. + se, err := c.CoreV1().Secrets("default").Create(ctx, makeSecret("name"), metav1.CreateOptions{}) + if err != nil { + t.Errorf("Couldn't create secret: %v", err) + } + + t.Logf("Waiting for configmaps watchcache synced to %s", se.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == se.ResourceVersion, nil + }); err != nil { + t.Errorf("Failed to wait for configmaps watchcache synced: %v", err) + } + t.Logf("Waiting for events watchcache NOT synced to %s", se.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (bool, error) { + res, err := c.CoreV1().Events("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == se.ResourceVersion, nil + }); err == nil || err != wait.ErrWaitTimeout { + t.Errorf("Events watchcache unexpected synced: %v", err) + } +} diff --git a/test/integration/framework/etcd.go b/test/integration/framework/etcd.go index b6d2ae8ce6b..4be970e21b3 100644 --- a/test/integration/framework/etcd.go +++ b/test/integration/framework/etcd.go @@ -84,41 +84,54 @@ func startEtcd() (func(), error) { } klog.V(1).Infof("could not connect to etcd: %v", err) + currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil) + if err != nil { + return nil, err + } + + etcdURL = currentURL + os.Setenv("KUBE_INTEGRATION_ETCD_URL", etcdURL) + + return stop, nil +} + +// RunCustomEtcd starts a custom etcd instance for test purposes. +func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) { // TODO: Check for valid etcd version. etcdPath, err := getEtcdPath() if err != nil { fmt.Fprintf(os.Stderr, installEtcd) - return nil, fmt.Errorf("could not find etcd in PATH: %v", err) + return "", nil, fmt.Errorf("could not find etcd in PATH: %v", err) } etcdPort, err := getAvailablePort() if err != nil { - return nil, fmt.Errorf("could not get a port: %v", err) + return "", nil, fmt.Errorf("could not get a port: %v", err) } - etcdURL = fmt.Sprintf("http://127.0.0.1:%d", etcdPort) + customURL := fmt.Sprintf("http://127.0.0.1:%d", etcdPort) - klog.Infof("starting etcd on %s", etcdURL) + klog.Infof("starting etcd on %s", customURL) - etcdDataDir, err := ioutil.TempDir(os.TempDir(), "integration_test_etcd_data") + etcdDataDir, err := ioutil.TempDir(os.TempDir(), dataDir) if err != nil { - return nil, fmt.Errorf("unable to make temp etcd data dir: %v", err) + return "", nil, fmt.Errorf("unable to make temp etcd data dir %s: %v", dataDir, err) } klog.Infof("storing etcd data in: %v", etcdDataDir) ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext( - ctx, - etcdPath, + args := []string{ "--data-dir", etcdDataDir, "--listen-client-urls", - GetEtcdURL(), + customURL, "--advertise-client-urls", - GetEtcdURL(), + customURL, "--listen-peer-urls", "http://127.0.0.1:0", "--log-package-levels", "*=NOTICE", // set to INFO or DEBUG for more logs - ) + } + args = append(args, customFlags...) + cmd := exec.CommandContext(ctx, etcdPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr stop := func() { @@ -136,14 +149,14 @@ func startEtcd() (func(), error) { clientv3.SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr)) if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to run etcd: %v", err) + return "", nil, fmt.Errorf("failed to run etcd: %v", err) } var i int32 = 1 const pollCount = int32(300) for i <= pollCount { - conn, err = net.DialTimeout("tcp", strings.TrimPrefix(etcdURL, "http://"), 1*time.Second) + conn, err := net.DialTimeout("tcp", strings.TrimPrefix(customURL, "http://"), 1*time.Second) if err == nil { conn.Close() break @@ -151,16 +164,14 @@ func startEtcd() (func(), error) { if i == pollCount { stop() - return nil, fmt.Errorf("could not start etcd") + return "", nil, fmt.Errorf("could not start etcd") } time.Sleep(100 * time.Millisecond) i = i + 1 } - os.Setenv("KUBE_INTEGRATION_ETCD_URL", etcdURL) - - return stop, nil + return customURL, stop, nil } // EtcdMain starts an etcd instance before running tests.