From efe19e4acbe2b9a96fe855ea545b340861c55feb Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Wed, 28 Feb 2018 13:37:42 -0600 Subject: [PATCH] master count and lease endpoint tests --- cmd/kube-apiserver/app/testing/testserver.go | 31 ++++- .../cluster_scoped_owner_test.go | 2 +- .../garbage_collector_test.go | 2 +- test/integration/master/BUILD | 1 + test/integration/master/crd_test.go | 4 +- .../integration/master/kube_apiserver_test.go | 116 +++++++++++++++++- .../master/transformation_testcase.go | 2 +- test/integration/scale/scale_test.go | 2 +- test/integration/tls/ciphers_test.go | 2 +- 9 files changed, 147 insertions(+), 15 deletions(-) diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 3b16c510d5f..98ee421dc0c 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -37,6 +37,12 @@ import ( // TearDownFunc is to be called to tear down a test server. type TearDownFunc func() +// TestServerInstanceOptions Instance options the TestServer +type TestServerInstanceOptions struct { + // DisableStorageCleanup Disable the automatic storage cleanup + DisableStorageCleanup bool +} + // TestServer return values supplied by kube-test-ApiServer type TestServer struct { ClientConfig *restclient.Config // Rest client config @@ -52,22 +58,36 @@ type Logger interface { Logf(format string, args ...interface{}) } +// NewDefaultTestServerOptions Default options for TestServer instances +func NewDefaultTestServerOptions() *TestServerInstanceOptions { + return &TestServerInstanceOptions{ + DisableStorageCleanup: false, + } +} + // StartTestServer starts a etcd server and kube-apiserver. A rest client config and a tear-down func, // and location of the tmpdir are returned. // // Note: we return a tear-down func instead of a stop channel because the later will leak temporary // files that because Golang testing's call to os.Exit will not give a stop channel go routine // enough time to remove temporary files. -func StartTestServer(t Logger, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { +func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { + if instanceOptions == nil { + instanceOptions = NewDefaultTestServerOptions() + } // TODO : Remove TrackStorageCleanup below when PR // https://github.com/kubernetes/kubernetes/pull/50690 // merges as that shuts down storage properly - registry.TrackStorageCleanup() + if !instanceOptions.DisableStorageCleanup { + registry.TrackStorageCleanup() + } stopCh := make(chan struct{}) tearDown := func() { - registry.CleanupStorage() + if !instanceOptions.DisableStorageCleanup { + registry.CleanupStorage() + } close(stopCh) if len(result.TmpDir) != 0 { os.RemoveAll(result.TmpDir) @@ -147,9 +167,8 @@ func StartTestServer(t Logger, customFlags []string, storageConfig *storagebacke } // StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed. -func StartTestServerOrDie(t Logger, flags []string, storageConfig *storagebackend.Config) *TestServer { - - result, err := StartTestServer(t, flags, storageConfig) +func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer { + result, err := StartTestServer(t, instanceOptions, flags, storageConfig) if err == nil { return &result } diff --git a/test/integration/garbagecollector/cluster_scoped_owner_test.go b/test/integration/garbagecollector/cluster_scoped_owner_test.go index a1ae4d2025d..1f8092f7890 100644 --- a/test/integration/garbagecollector/cluster_scoped_owner_test.go +++ b/test/integration/garbagecollector/cluster_scoped_owner_test.go @@ -51,7 +51,7 @@ func (b *readDelayer) Read(p []byte) (n int, err error) { func TestClusterScopedOwners(t *testing.T) { // Start the test server and wrap the client to delay PV watch responses - server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()) + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) server.ClientConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { return roundTripFunc(func(req *http.Request) (*http.Response, error) { if req.URL.Query().Get("watch") != "true" || !strings.Contains(req.URL.String(), "persistentvolumes") { diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index bc38a27975e..cfa03748fc6 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -200,7 +200,7 @@ type testContext struct { // if workerCount > 0, will start the GC, otherwise it's up to the caller to Run() the GC. func setup(t *testing.T, workerCount int) *testContext { - return setupWithServer(t, kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()), workerCount) + return setupWithServer(t, kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()), workerCount) } func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, workerCount int) *testContext { diff --git a/test/integration/master/BUILD b/test/integration/master/BUILD index 72ba0580af5..15cac881d38 100644 --- a/test/integration/master/BUILD +++ b/test/integration/master/BUILD @@ -78,6 +78,7 @@ go_test( "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", "//vendor/k8s.io/apiserver/pkg/features:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/options/encryptionconfig:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library", diff --git a/test/integration/master/crd_test.go b/test/integration/master/crd_test.go index f2bc91a5c87..ebe69b54645 100644 --- a/test/integration/master/crd_test.go +++ b/test/integration/master/crd_test.go @@ -41,7 +41,7 @@ import ( ) func TestCRDShadowGroup(t *testing.T) { - result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()) + result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer result.TearDownFn() kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) @@ -109,7 +109,7 @@ func TestCRDShadowGroup(t *testing.T) { func TestCRD(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.Initializers, true)() - result := kubeapiservertesting.StartTestServerOrDie(t, []string{"--admission-control", "Initializers"}, framework.SharedEtcd()) + result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--admission-control", "Initializers"}, framework.SharedEtcd()) defer result.TearDownFn() kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index 6beadcd67c5..187b1da1b00 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -18,13 +18,18 @@ package master import ( "encoding/json" + "fmt" + "reflect" "strings" "testing" + "time" appsv1beta1 "k8s.io/api/apps/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/client-go/kubernetes" "k8s.io/kube-aggregator/pkg/apis/apiregistration" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" @@ -32,7 +37,7 @@ import ( ) func TestRun(t *testing.T) { - server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()) + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer server.TearDownFn() client, err := kubernetes.NewForConfig(server.ClientConfig) @@ -82,7 +87,7 @@ func TestRun(t *testing.T) { // apiextensions-server and the kube-aggregator server, both part of // the delegation chain in kube-apiserver. func TestOpenAPIDelegationChainPlumbing(t *testing.T) { - server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()) + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer server.TearDownFn() kubeclient, err := kubernetes.NewForConfig(server.ClientConfig) @@ -138,3 +143,110 @@ func TestOpenAPIDelegationChainPlumbing(t *testing.T) { t.Errorf("missing path: %q", registrationPrefix) } } + +// return the unique endpoint IPs +func getEndpointIPs(endpoints *corev1.Endpoints) []string { + endpointMap := make(map[string]bool) + ips := make([]string, 0) + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + if _, ok := endpointMap[address.IP]; !ok { + endpointMap[address.IP] = true + ips = append(ips, address.IP) + } + } + } + return ips +} + +func verifyEndpointsWithIPs(servers []*kubeapiservertesting.TestServer, ips []string) bool { + listenAddresses := make([]string, 0) + for _, server := range servers { + listenAddresses = append(listenAddresses, server.ServerOpts.GenericServerRunOptions.AdvertiseAddress.String()) + } + return reflect.DeepEqual(listenAddresses, ips) +} + +func testReconcilersMasterLease(t *testing.T, leaseCount int, masterCount int) { + var leaseServers []*kubeapiservertesting.TestServer + var masterCountServers []*kubeapiservertesting.TestServer + etcd := framework.SharedEtcd() + + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + DisableStorageCleanup: true, + } + + // cleanup the registry storage + defer registry.CleanupStorage() + + // 1. start masterCount api servers + for i := 0; i < masterCount; i++ { + // start master count api server + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{ + "--endpoint-reconciler-type", "master-count", + "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1), + "--apiserver-count", fmt.Sprintf("%v", masterCount), + }, etcd) + masterCountServers = append(masterCountServers, server) + } + + // 2. verify master count servers have registered + if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) { + client, err := kubernetes.NewForConfig(masterCountServers[0].ClientConfig) + endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return verifyEndpointsWithIPs(masterCountServers, getEndpointIPs(endpoints)), nil + }); err != nil { + t.Fatalf("master count endpoints failed to register: %v", err) + } + + // 3. start lease api servers + for i := 0; i < leaseCount; i++ { + options := []string{ + "--endpoint-reconciler-type", "lease", + "--advertise-address", fmt.Sprintf("10.0.1.%v", i+10), + } + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, options, etcd) + defer server.TearDownFn() + leaseServers = append(leaseServers, server) + } + + time.Sleep(3 * time.Second) + + // 4. Shutdown the masterCount server + for _, server := range masterCountServers { + server.TearDownFn() + } + + // 5. verify only leaseEndpoint servers left + if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) { + client, err := kubernetes.NewForConfig(leaseServers[0].ClientConfig) + if err != nil { + t.Logf("create client error: %v", err) + return false, nil + } + endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return verifyEndpointsWithIPs(leaseServers, getEndpointIPs(endpoints)), nil + }); err != nil { + t.Fatalf("did not find only lease endpoints: %v", err) + } +} + +func TestReconcilerMasterLeaseCombined(t *testing.T) { + testReconcilersMasterLease(t, 1, 3) +} + +func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { + testReconcilersMasterLease(t, 3, 2) +} + +func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { + testReconcilersMasterLease(t, 3, 3) +} diff --git a/test/integration/master/transformation_testcase.go b/test/integration/master/transformation_testcase.go index 53bfdba864f..aa692a561ed 100644 --- a/test/integration/master/transformation_testcase.go +++ b/test/integration/master/transformation_testcase.go @@ -78,7 +78,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin } } - if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, e.getEncryptionOptions(), e.storageConfig); err != nil { + if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(), e.storageConfig); err != nil { return nil, fmt.Errorf("failed to start KubeAPI server: %v", err) } diff --git a/test/integration/scale/scale_test.go b/test/integration/scale/scale_test.go index fe92420ec1e..6c4db98a3b1 100644 --- a/test/integration/scale/scale_test.go +++ b/test/integration/scale/scale_test.go @@ -215,7 +215,7 @@ var ( ) func setup(t *testing.T) (client kubernetes.Interface, tearDown func()) { - result := apitesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()) + result := apitesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) // TODO: Disable logging here until we resolve teardown issues which result in // massive log spam. Another path forward would be to refactor diff --git a/test/integration/tls/ciphers_test.go b/test/integration/tls/ciphers_test.go index 479fbbb3ea7..070960e386f 100644 --- a/test/integration/tls/ciphers_test.go +++ b/test/integration/tls/ciphers_test.go @@ -29,7 +29,7 @@ import ( func runBasicSecureAPIServer(t *testing.T, ciphers []string) (kubeapiservertesting.TearDownFunc, int) { flags := []string{"--tls-cipher-suites", strings.Join(ciphers, ",")} - testServer := kubeapiservertesting.StartTestServerOrDie(t, flags, framework.SharedEtcd()) + testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, flags, framework.SharedEtcd()) return testServer.TearDownFn, testServer.ServerOpts.SecureServing.BindPort }