mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	Merge pull request #60890 from rphillips/tests/lease_endpoint
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. apiserver: master count and lease endpoint test **What this PR does / why we need it**: Adds a test to make sure master count and lease endpoint reconcilers work well together, so we can bump LeaseEndpoint to beta. Based on Jordan's comment https://github.com/kubernetes/kubernetes/pull/58474#issuecomment-369954890. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Issue: #57617 Followup PR: #58474 **Special notes for your reviewer**: **Release note**: ```release-note NONE ``` /cc @kubernetes/sig-cluster-lifecycle-api-reviews @kubernetes/sig-cluster-lifecycle-api-reviews
This commit is contained in:
		@@ -37,6 +37,12 @@ import (
 | 
			
		||||
// TearDownFunc is to be called to tear down a test server.
 | 
			
		||||
type TearDownFunc func()
 | 
			
		||||
 | 
			
		||||
// TestServerInstanceOptions Instance options the TestServer
 | 
			
		||||
type TestServerInstanceOptions struct {
 | 
			
		||||
	// DisableStorageCleanup Disable the automatic storage cleanup
 | 
			
		||||
	DisableStorageCleanup bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestServer return values supplied by kube-test-ApiServer
 | 
			
		||||
type TestServer struct {
 | 
			
		||||
	ClientConfig *restclient.Config        // Rest client config
 | 
			
		||||
@@ -52,22 +58,36 @@ type Logger interface {
 | 
			
		||||
	Logf(format string, args ...interface{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDefaultTestServerOptions Default options for TestServer instances
 | 
			
		||||
func NewDefaultTestServerOptions() *TestServerInstanceOptions {
 | 
			
		||||
	return &TestServerInstanceOptions{
 | 
			
		||||
		DisableStorageCleanup: false,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// StartTestServer starts a etcd server and kube-apiserver. A rest client config and a tear-down func,
 | 
			
		||||
// and location of the tmpdir are returned.
 | 
			
		||||
//
 | 
			
		||||
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
 | 
			
		||||
// 		 files that because Golang testing's call to os.Exit will not give a stop channel go routine
 | 
			
		||||
// 		 enough time to remove temporary files.
 | 
			
		||||
func StartTestServer(t Logger, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
 | 
			
		||||
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
 | 
			
		||||
	if instanceOptions == nil {
 | 
			
		||||
		instanceOptions = NewDefaultTestServerOptions()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO : Remove TrackStorageCleanup below when PR
 | 
			
		||||
	// https://github.com/kubernetes/kubernetes/pull/50690
 | 
			
		||||
	// merges as that shuts down storage properly
 | 
			
		||||
	if !instanceOptions.DisableStorageCleanup {
 | 
			
		||||
		registry.TrackStorageCleanup()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	tearDown := func() {
 | 
			
		||||
		if !instanceOptions.DisableStorageCleanup {
 | 
			
		||||
			registry.CleanupStorage()
 | 
			
		||||
		}
 | 
			
		||||
		close(stopCh)
 | 
			
		||||
		if len(result.TmpDir) != 0 {
 | 
			
		||||
			os.RemoveAll(result.TmpDir)
 | 
			
		||||
@@ -148,9 +168,8 @@ func StartTestServer(t Logger, customFlags []string, storageConfig *storagebacke
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
 | 
			
		||||
func StartTestServerOrDie(t Logger, flags []string, storageConfig *storagebackend.Config) *TestServer {
 | 
			
		||||
 | 
			
		||||
	result, err := StartTestServer(t, flags, storageConfig)
 | 
			
		||||
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
 | 
			
		||||
	result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		return &result
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -51,7 +51,7 @@ func (b *readDelayer) Read(p []byte) (n int, err error) {
 | 
			
		||||
 | 
			
		||||
func TestClusterScopedOwners(t *testing.T) {
 | 
			
		||||
	// Start the test server and wrap the client to delay PV watch responses
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
			
		||||
	server.ClientConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
 | 
			
		||||
		return roundTripFunc(func(req *http.Request) (*http.Response, error) {
 | 
			
		||||
			if req.URL.Query().Get("watch") != "true" || !strings.Contains(req.URL.String(), "persistentvolumes") {
 | 
			
		||||
 
 | 
			
		||||
@@ -201,7 +201,7 @@ type testContext struct {
 | 
			
		||||
 | 
			
		||||
// if workerCount > 0, will start the GC, otherwise it's up to the caller to Run() the GC.
 | 
			
		||||
func setup(t *testing.T, workerCount int) *testContext {
 | 
			
		||||
	return setupWithServer(t, kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd()), workerCount)
 | 
			
		||||
	return setupWithServer(t, kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()), workerCount)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, workerCount int) *testContext {
 | 
			
		||||
 
 | 
			
		||||
@@ -78,6 +78,7 @@ go_test(
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/features:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/server/options/encryptionconfig:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -41,7 +41,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestCRDShadowGroup(t *testing.T) {
 | 
			
		||||
	result := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
 | 
			
		||||
	result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
			
		||||
	defer result.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
 | 
			
		||||
@@ -109,7 +109,7 @@ func TestCRDShadowGroup(t *testing.T) {
 | 
			
		||||
func TestCRD(t *testing.T) {
 | 
			
		||||
	defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.Initializers, true)()
 | 
			
		||||
 | 
			
		||||
	result := kubeapiservertesting.StartTestServerOrDie(t, []string{"--admission-control", "Initializers"}, framework.SharedEtcd())
 | 
			
		||||
	result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--admission-control", "Initializers"}, framework.SharedEtcd())
 | 
			
		||||
	defer result.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
 | 
			
		||||
 
 | 
			
		||||
@@ -18,13 +18,18 @@ package master
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	appsv1beta1 "k8s.io/api/apps/v1beta1"
 | 
			
		||||
	corev1 "k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/registry/generic/registry"
 | 
			
		||||
	"k8s.io/client-go/kubernetes"
 | 
			
		||||
	"k8s.io/kube-aggregator/pkg/apis/apiregistration"
 | 
			
		||||
	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
 | 
			
		||||
@@ -32,7 +37,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestRun(t *testing.T) {
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
			
		||||
	defer server.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	client, err := kubernetes.NewForConfig(server.ClientConfig)
 | 
			
		||||
@@ -82,7 +87,7 @@ func TestRun(t *testing.T) {
 | 
			
		||||
// apiextensions-server and the kube-aggregator server, both part of
 | 
			
		||||
// the delegation chain in kube-apiserver.
 | 
			
		||||
func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
			
		||||
	defer server.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	kubeclient, err := kubernetes.NewForConfig(server.ClientConfig)
 | 
			
		||||
@@ -138,3 +143,110 @@ func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
 | 
			
		||||
		t.Errorf("missing path: %q", registrationPrefix)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// return the unique endpoint IPs
 | 
			
		||||
func getEndpointIPs(endpoints *corev1.Endpoints) []string {
 | 
			
		||||
	endpointMap := make(map[string]bool)
 | 
			
		||||
	ips := make([]string, 0)
 | 
			
		||||
	for _, subset := range endpoints.Subsets {
 | 
			
		||||
		for _, address := range subset.Addresses {
 | 
			
		||||
			if _, ok := endpointMap[address.IP]; !ok {
 | 
			
		||||
				endpointMap[address.IP] = true
 | 
			
		||||
				ips = append(ips, address.IP)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ips
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func verifyEndpointsWithIPs(servers []*kubeapiservertesting.TestServer, ips []string) bool {
 | 
			
		||||
	listenAddresses := make([]string, 0)
 | 
			
		||||
	for _, server := range servers {
 | 
			
		||||
		listenAddresses = append(listenAddresses, server.ServerOpts.GenericServerRunOptions.AdvertiseAddress.String())
 | 
			
		||||
	}
 | 
			
		||||
	return reflect.DeepEqual(listenAddresses, ips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testReconcilersMasterLease(t *testing.T, leaseCount int, masterCount int) {
 | 
			
		||||
	var leaseServers []*kubeapiservertesting.TestServer
 | 
			
		||||
	var masterCountServers []*kubeapiservertesting.TestServer
 | 
			
		||||
	etcd := framework.SharedEtcd()
 | 
			
		||||
 | 
			
		||||
	instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{
 | 
			
		||||
		DisableStorageCleanup: true,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// cleanup the registry storage
 | 
			
		||||
	defer registry.CleanupStorage()
 | 
			
		||||
 | 
			
		||||
	// 1. start masterCount api servers
 | 
			
		||||
	for i := 0; i < masterCount; i++ {
 | 
			
		||||
		// start master count api server
 | 
			
		||||
		server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
 | 
			
		||||
			"--endpoint-reconciler-type", "master-count",
 | 
			
		||||
			"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
 | 
			
		||||
			"--apiserver-count", fmt.Sprintf("%v", masterCount),
 | 
			
		||||
		}, etcd)
 | 
			
		||||
		masterCountServers = append(masterCountServers, server)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 2. verify master count servers have registered
 | 
			
		||||
	if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
 | 
			
		||||
		client, err := kubernetes.NewForConfig(masterCountServers[0].ClientConfig)
 | 
			
		||||
		endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Logf("error fetching endpoints: %v", err)
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return verifyEndpointsWithIPs(masterCountServers, getEndpointIPs(endpoints)), nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		t.Fatalf("master count endpoints failed to register: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 3. start lease api servers
 | 
			
		||||
	for i := 0; i < leaseCount; i++ {
 | 
			
		||||
		options := []string{
 | 
			
		||||
			"--endpoint-reconciler-type", "lease",
 | 
			
		||||
			"--advertise-address", fmt.Sprintf("10.0.1.%v", i+10),
 | 
			
		||||
		}
 | 
			
		||||
		server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, options, etcd)
 | 
			
		||||
		defer server.TearDownFn()
 | 
			
		||||
		leaseServers = append(leaseServers, server)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	time.Sleep(3 * time.Second)
 | 
			
		||||
 | 
			
		||||
	// 4. Shutdown the masterCount server
 | 
			
		||||
	for _, server := range masterCountServers {
 | 
			
		||||
		server.TearDownFn()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 5. verify only leaseEndpoint servers left
 | 
			
		||||
	if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
 | 
			
		||||
		client, err := kubernetes.NewForConfig(leaseServers[0].ClientConfig)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Logf("create client error: %v", err)
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Logf("error fetching endpoints: %v", err)
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return verifyEndpointsWithIPs(leaseServers, getEndpointIPs(endpoints)), nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		t.Fatalf("did not find only lease endpoints: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestReconcilerMasterLeaseCombined(t *testing.T) {
 | 
			
		||||
	testReconcilersMasterLease(t, 1, 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) {
 | 
			
		||||
	testReconcilersMasterLease(t, 3, 2)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestReconcilerMasterLeaseMultiCombined(t *testing.T) {
 | 
			
		||||
	testReconcilersMasterLease(t, 3, 3)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -78,7 +78,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, e.getEncryptionOptions(), e.storageConfig); err != nil {
 | 
			
		||||
	if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(), e.storageConfig); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to start KubeAPI server: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -215,7 +215,7 @@ var (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func setup(t *testing.T) (client kubernetes.Interface, tearDown func()) {
 | 
			
		||||
	result := apitesting.StartTestServerOrDie(t, nil, framework.SharedEtcd())
 | 
			
		||||
	result := apitesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
			
		||||
 | 
			
		||||
	// TODO: Disable logging here until we resolve teardown issues which result in
 | 
			
		||||
	// massive log spam. Another path forward would be to refactor
 | 
			
		||||
 
 | 
			
		||||
@@ -29,7 +29,7 @@ import (
 | 
			
		||||
 | 
			
		||||
func runBasicSecureAPIServer(t *testing.T, ciphers []string) (kubeapiservertesting.TearDownFunc, int) {
 | 
			
		||||
	flags := []string{"--tls-cipher-suites", strings.Join(ciphers, ",")}
 | 
			
		||||
	testServer := kubeapiservertesting.StartTestServerOrDie(t, flags, framework.SharedEtcd())
 | 
			
		||||
	testServer := kubeapiservertesting.StartTestServerOrDie(t, nil, flags, framework.SharedEtcd())
 | 
			
		||||
	return testServer.TearDownFn, testServer.ServerOpts.SecureServing.BindPort
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user