mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #42328 from deads2k/agg-23-testport
get fresh ports on startup failure for integration test
This commit is contained in:
commit
3769404b32
@ -26,6 +26,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -73,10 +74,6 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
kubePort, err := localPort()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
|
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
|
||||||
defer os.RemoveAll(certDir)
|
defer os.RemoveAll(certDir)
|
||||||
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
|
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
|
||||||
@ -105,6 +102,15 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
kubeClientConfigValue := atomic.Value{}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
// always get a fresh port in case something claimed the old one
|
||||||
|
kubePort, err := localPort()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
kubeAPIServerOptions := options.NewServerRunOptions()
|
kubeAPIServerOptions := options.NewServerRunOptions()
|
||||||
kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1")
|
kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1")
|
||||||
kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort
|
kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort
|
||||||
@ -124,9 +130,8 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
kubeClientConfigValue.Store(config.GenericConfig.LoopbackClientConfig)
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
if err := app.RunServer(config, sharedInformers, stopCh); err != nil {
|
if err := app.RunServer(config, sharedInformers, stopCh); err != nil {
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
}
|
}
|
||||||
@ -135,10 +140,20 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// just use json because everyone speaks it
|
// just use json because everyone speaks it
|
||||||
config.GenericConfig.LoopbackClientConfig.ContentType = ""
|
|
||||||
config.GenericConfig.LoopbackClientConfig.AcceptContentTypes = ""
|
|
||||||
kubeClient := client.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
|
|
||||||
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||||
|
obj := kubeClientConfigValue.Load()
|
||||||
|
if obj == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config)
|
||||||
|
kubeClientConfig.ContentType = ""
|
||||||
|
kubeClientConfig.AcceptContentTypes = ""
|
||||||
|
kubeClient, err := client.NewForConfig(kubeClientConfig)
|
||||||
|
if err != nil {
|
||||||
|
// this happens because we race the API server start
|
||||||
|
t.Log(err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
|
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -148,23 +163,31 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// after this point we won't be mutating, so the race detector will be fine
|
||||||
|
kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config)
|
||||||
|
|
||||||
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
|
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
|
||||||
adminKubeConfig := createKubeConfig(config.GenericConfig.LoopbackClientConfig)
|
adminKubeConfig := createKubeConfig(kubeClientConfig)
|
||||||
kubeconfigFile, _ := ioutil.TempFile("", "")
|
kubeconfigFile, _ := ioutil.TempFile("", "")
|
||||||
defer os.Remove(kubeconfigFile.Name())
|
defer os.Remove(kubeconfigFile.Name())
|
||||||
clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name())
|
clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name())
|
||||||
|
wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
|
||||||
|
defer os.RemoveAll(wardleCertDir)
|
||||||
|
wardlePort := new(int32)
|
||||||
|
|
||||||
// start the wardle server to prove we can aggregate it
|
// start the wardle server to prove we can aggregate it
|
||||||
wardlePort, err := localPort()
|
go func() {
|
||||||
|
for {
|
||||||
|
// always get a fresh port in case something claimed the old one
|
||||||
|
wardlePortInt, err := localPort()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
|
atomic.StoreInt32(wardlePort, int32(wardlePortInt))
|
||||||
defer os.RemoveAll(wardleCertDir)
|
|
||||||
wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
|
wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
|
||||||
wardleCmd.SetArgs([]string{
|
wardleCmd.SetArgs([]string{
|
||||||
"--bind-address", "127.0.0.1",
|
"--bind-address", "127.0.0.1",
|
||||||
"--secure-port", strconv.Itoa(wardlePort),
|
"--secure-port", strconv.Itoa(wardlePortInt),
|
||||||
"--requestheader-username-headers=X-Remote-User",
|
"--requestheader-username-headers=X-Remote-User",
|
||||||
"--requestheader-group-headers=X-Remote-Group",
|
"--requestheader-group-headers=X-Remote-Group",
|
||||||
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
|
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
|
||||||
@ -175,8 +198,6 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
"--etcd-servers", framework.GetEtcdURLFromEnv(),
|
"--etcd-servers", framework.GetEtcdURLFromEnv(),
|
||||||
"--cert-dir", wardleCertDir,
|
"--cert-dir", wardleCertDir,
|
||||||
})
|
})
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
if err := wardleCmd.Execute(); err != nil {
|
if err := wardleCmd.Execute(); err != nil {
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
}
|
}
|
||||||
@ -184,17 +205,17 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wardleClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig)
|
wardleClientConfig := rest.AnonymousClientConfig(kubeClientConfig)
|
||||||
wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
|
|
||||||
wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
|
wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
|
||||||
wardleClientConfig.CAData = nil
|
wardleClientConfig.CAData = nil
|
||||||
wardleClientConfig.ServerName = ""
|
wardleClientConfig.ServerName = ""
|
||||||
wardleClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken
|
wardleClientConfig.BearerToken = kubeClientConfig.BearerToken
|
||||||
var wardleClient client.Interface
|
var wardleClient client.Interface
|
||||||
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||||
|
wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", atomic.LoadInt32(wardlePort))
|
||||||
wardleClient, err = client.NewForConfig(wardleClientConfig)
|
wardleClient, err = client.NewForConfig(wardleClientConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// this happens if we race the API server for writing the cert
|
// this happens because we race the API server start
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -209,10 +230,6 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// start the aggregator
|
// start the aggregator
|
||||||
aggregatorPort, err := localPort()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator")
|
aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator")
|
||||||
defer os.RemoveAll(aggregatorCertDir)
|
defer os.RemoveAll(aggregatorCertDir)
|
||||||
proxyClientKey, err := cert.NewPrivateKey()
|
proxyClientKey, err := cert.NewPrivateKey()
|
||||||
@ -234,10 +251,20 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
if err := ioutil.WriteFile(proxyClientKeyFile.Name(), cert.EncodePrivateKeyPEM(proxyClientKey), 0644); err != nil {
|
if err := ioutil.WriteFile(proxyClientKeyFile.Name(), cert.EncodePrivateKeyPEM(proxyClientKey), 0644); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
aggregatorPort := new(int32)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
// always get a fresh port in case something claimed the old one
|
||||||
|
aggregatorPortInt, err := localPort()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
atomic.StoreInt32(aggregatorPort, int32(aggregatorPortInt))
|
||||||
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh)
|
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh)
|
||||||
aggregatorCmd.SetArgs([]string{
|
aggregatorCmd.SetArgs([]string{
|
||||||
"--bind-address", "127.0.0.1",
|
"--bind-address", "127.0.0.1",
|
||||||
"--secure-port", strconv.Itoa(aggregatorPort),
|
"--secure-port", strconv.Itoa(aggregatorPortInt),
|
||||||
"--requestheader-username-headers", "",
|
"--requestheader-username-headers", "",
|
||||||
"--proxy-client-cert-file", proxyClientCertFile.Name(),
|
"--proxy-client-cert-file", proxyClientCertFile.Name(),
|
||||||
"--proxy-client-key-file", proxyClientKeyFile.Name(),
|
"--proxy-client-key-file", proxyClientKeyFile.Name(),
|
||||||
@ -247,8 +274,6 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
"--etcd-servers", framework.GetEtcdURLFromEnv(),
|
"--etcd-servers", framework.GetEtcdURLFromEnv(),
|
||||||
"--cert-dir", aggregatorCertDir,
|
"--cert-dir", aggregatorCertDir,
|
||||||
})
|
})
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
if err := aggregatorCmd.Execute(); err != nil {
|
if err := aggregatorCmd.Execute(); err != nil {
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
}
|
}
|
||||||
@ -256,14 +281,14 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
aggregatorClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig)
|
aggregatorClientConfig := rest.AnonymousClientConfig(kubeClientConfig)
|
||||||
aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", aggregatorPort)
|
|
||||||
aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt")
|
aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt")
|
||||||
aggregatorClientConfig.CAData = nil
|
aggregatorClientConfig.CAData = nil
|
||||||
aggregatorClientConfig.ServerName = ""
|
aggregatorClientConfig.ServerName = ""
|
||||||
aggregatorClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken
|
aggregatorClientConfig.BearerToken = kubeClientConfig.BearerToken
|
||||||
var aggregatorDiscoveryClient client.Interface
|
var aggregatorDiscoveryClient client.Interface
|
||||||
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||||
|
aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", atomic.LoadInt32(aggregatorPort))
|
||||||
aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig)
|
aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// this happens if we race the API server for writing the cert
|
// this happens if we race the API server for writing the cert
|
||||||
@ -321,7 +346,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Group: "",
|
Group: "",
|
||||||
Version: "v1",
|
Version: "v1",
|
||||||
CABundle: config.GenericConfig.LoopbackClientConfig.CAData,
|
CABundle: kubeClientConfig.CAData,
|
||||||
Priority: 100,
|
Priority: 100,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user