master count and lease endpoint tests

This commit is contained in:
Ryan Phillips 2018-02-28 13:37:42 -06:00
parent c0d1ab8e99
commit efe19e4acb
9 changed files with 147 additions and 15 deletions

View File

@ -37,6 +37,12 @@ import (
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()
// TestServerInstanceOptions Instance options the TestServer
type TestServerInstanceOptions struct {
// DisableStorageCleanup Disable the automatic storage cleanup
DisableStorageCleanup bool
}
// TestServer return values supplied by kube-test-ApiServer
type TestServer struct {
ClientConfig *restclient.Config // Rest client config
@ -52,22 +58,36 @@ type Logger interface {
Logf(format string, args ...interface{})
}
// NewDefaultTestServerOptions Default options for TestServer instances
func NewDefaultTestServerOptions() *TestServerInstanceOptions {
return &TestServerInstanceOptions{
DisableStorageCleanup: false,
}
}
// StartTestServer starts a etcd server and kube-apiserver. A rest client config and a tear-down func,
// and location of the tmpdir are returned.
//
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
if instanceOptions == nil {
instanceOptions = NewDefaultTestServerOptions()
}
// TODO : Remove TrackStorageCleanup below when PR
// https://github.com/kubernetes/kubernetes/pull/50690
// merges as that shuts down storage properly
if !instanceOptions.DisableStorageCleanup {
registry.TrackStorageCleanup()
}
stopCh := make(chan struct{})
tearDown := func() {
if !instanceOptions.DisableStorageCleanup {
registry.CleanupStorage()
}
close(stopCh)
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
@ -147,9 +167,8 @@ func StartTestServer(t Logger, customFlags []string, storageConfig *storagebacke
}
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, flags []string, storageConfig *storagebackend.Config) *TestServer {
result, err := StartTestServer(t, flags, storageConfig)
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
if err == nil {
return &result
}

View File

@ -51,7 +51,7 @@ func (b *readDelayer) Read(p []byte) (n int, err error) {
func TestClusterScopedOwners(t *testing.T) {
// Start the test server and wrap the client to delay PV watch responses
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
server.ClientConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return roundTripFunc(func(req *http.Request) (*http.Response, error) {
if req.URL.Query().Get("watch") != "true" || !strings.Contains(req.URL.String(), "persistentvolumes") {

View File

@ -200,7 +200,7 @@ type testContext struct {
// if workerCount > 0, will start the GC, otherwise it's up to the caller to Run() the GC.
func setup(t *testing.T, workerCount int) *testContext {
return setupWithServer(t, kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()), workerCount)
return setupWithServer(t, kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()), workerCount)
}
func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, workerCount int) *testContext {

View File

@ -78,6 +78,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
"//vendor/k8s.io/apiserver/pkg/features:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options/encryptionconfig:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library",

View File

@ -41,7 +41,7 @@ import (
)
func TestCRDShadowGroup(t *testing.T) {
result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
@ -109,7 +109,7 @@ func TestCRDShadowGroup(t *testing.T) {
func TestCRD(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.Initializers, true)()
result := kubeapiservertesting.StartTestServerOrDie(t, []string{"--admission-control", "Initializers"}, framework.SharedEtcd())
result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--admission-control", "Initializers"}, framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)

View File

@ -18,13 +18,18 @@ package master
import (
"encoding/json"
"fmt"
"reflect"
"strings"
"testing"
"time"
appsv1beta1 "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/client-go/kubernetes"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
@ -32,7 +37,7 @@ import (
)
func TestRun(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
client, err := kubernetes.NewForConfig(server.ClientConfig)
@ -82,7 +87,7 @@ func TestRun(t *testing.T) {
// apiextensions-server and the kube-aggregator server, both part of
// the delegation chain in kube-apiserver.
func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(server.ClientConfig)
@ -138,3 +143,110 @@ func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
t.Errorf("missing path: %q", registrationPrefix)
}
}
// return the unique endpoint IPs
func getEndpointIPs(endpoints *corev1.Endpoints) []string {
endpointMap := make(map[string]bool)
ips := make([]string, 0)
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
if _, ok := endpointMap[address.IP]; !ok {
endpointMap[address.IP] = true
ips = append(ips, address.IP)
}
}
}
return ips
}
func verifyEndpointsWithIPs(servers []*kubeapiservertesting.TestServer, ips []string) bool {
listenAddresses := make([]string, 0)
for _, server := range servers {
listenAddresses = append(listenAddresses, server.ServerOpts.GenericServerRunOptions.AdvertiseAddress.String())
}
return reflect.DeepEqual(listenAddresses, ips)
}
func testReconcilersMasterLease(t *testing.T, leaseCount int, masterCount int) {
var leaseServers []*kubeapiservertesting.TestServer
var masterCountServers []*kubeapiservertesting.TestServer
etcd := framework.SharedEtcd()
instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{
DisableStorageCleanup: true,
}
// cleanup the registry storage
defer registry.CleanupStorage()
// 1. start masterCount api servers
for i := 0; i < masterCount; i++ {
// start master count api server
server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
"--endpoint-reconciler-type", "master-count",
"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
"--apiserver-count", fmt.Sprintf("%v", masterCount),
}, etcd)
masterCountServers = append(masterCountServers, server)
}
// 2. verify master count servers have registered
if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
client, err := kubernetes.NewForConfig(masterCountServers[0].ClientConfig)
endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
}
return verifyEndpointsWithIPs(masterCountServers, getEndpointIPs(endpoints)), nil
}); err != nil {
t.Fatalf("master count endpoints failed to register: %v", err)
}
// 3. start lease api servers
for i := 0; i < leaseCount; i++ {
options := []string{
"--endpoint-reconciler-type", "lease",
"--advertise-address", fmt.Sprintf("10.0.1.%v", i+10),
}
server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, options, etcd)
defer server.TearDownFn()
leaseServers = append(leaseServers, server)
}
time.Sleep(3 * time.Second)
// 4. Shutdown the masterCount server
for _, server := range masterCountServers {
server.TearDownFn()
}
// 5. verify only leaseEndpoint servers left
if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
client, err := kubernetes.NewForConfig(leaseServers[0].ClientConfig)
if err != nil {
t.Logf("create client error: %v", err)
return false, nil
}
endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
}
return verifyEndpointsWithIPs(leaseServers, getEndpointIPs(endpoints)), nil
}); err != nil {
t.Fatalf("did not find only lease endpoints: %v", err)
}
}
func TestReconcilerMasterLeaseCombined(t *testing.T) {
testReconcilersMasterLease(t, 1, 3)
}
func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) {
testReconcilersMasterLease(t, 3, 2)
}
func TestReconcilerMasterLeaseMultiCombined(t *testing.T) {
testReconcilersMasterLease(t, 3, 3)
}

View File

@ -78,7 +78,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin
}
}
if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, e.getEncryptionOptions(), e.storageConfig); err != nil {
if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(), e.storageConfig); err != nil {
return nil, fmt.Errorf("failed to start KubeAPI server: %v", err)
}

View File

@ -215,7 +215,7 @@ var (
)
func setup(t *testing.T) (client kubernetes.Interface, tearDown func()) {
result := apitesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
result := apitesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
// TODO: Disable logging here until we resolve teardown issues which result in
// massive log spam. Another path forward would be to refactor

View File

@ -29,7 +29,7 @@ import (
func runBasicSecureAPIServer(t *testing.T, ciphers []string) (kubeapiservertesting.TearDownFunc, int) {
flags := []string{"--tls-cipher-suites", strings.Join(ciphers, ",")}
testServer := kubeapiservertesting.StartTestServerOrDie(t, flags, framework.SharedEtcd())
testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, flags, framework.SharedEtcd())
return testServer.TearDownFn, testServer.ServerOpts.SecureServing.BindPort
}