mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
pass listener in integration test to prevent port in use flake
This commit is contained in:
parent
84408378f9
commit
a6c43c6a5c
@ -35,7 +35,8 @@ import (
|
|||||||
|
|
||||||
type SecureServingOptions struct {
|
type SecureServingOptions struct {
|
||||||
BindAddress net.IP
|
BindAddress net.IP
|
||||||
BindPort int
|
// BindPort is ignored when Listener is set, will serve https even with 0.
|
||||||
|
BindPort int
|
||||||
// BindNetwork is the type of network to bind to - defaults to "tcp", accepts "tcp",
|
// BindNetwork is the type of network to bind to - defaults to "tcp", accepts "tcp",
|
||||||
// "tcp4", and "tcp6".
|
// "tcp4", and "tcp6".
|
||||||
BindNetwork string
|
BindNetwork string
|
||||||
@ -160,7 +161,7 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error {
|
|||||||
if s == nil {
|
if s == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if s.BindPort <= 0 {
|
if s.BindPort <= 0 && s.Listener == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,6 +172,12 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create listener: %v", err)
|
return fmt.Errorf("failed to create listener: %v", err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if _, ok := s.Listener.Addr().(*net.TCPAddr); !ok {
|
||||||
|
return fmt.Errorf("failed to parse ip and port from listener")
|
||||||
|
}
|
||||||
|
s.BindPort = s.Listener.Addr().(*net.TCPAddr).Port
|
||||||
|
s.BindAddress = s.Listener.Addr().(*net.TCPAddr).IP
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.applyServingInfoTo(c); err != nil {
|
if err := s.applyServingInfoTo(c); err != nil {
|
||||||
|
@ -44,7 +44,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stopCh := genericapiserver.SetupSignalHandler()
|
stopCh := genericapiserver.SetupSignalHandler()
|
||||||
cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh)
|
options := server.NewDefaultOptions(os.Stdout, os.Stderr)
|
||||||
|
cmd := server.NewCommandStartAggregator(options, stopCh)
|
||||||
cmd.Flags().AddGoFlagSet(flag.CommandLine)
|
cmd.Flags().AddGoFlagSet(flag.CommandLine)
|
||||||
if err := cmd.Execute(); err != nil {
|
if err := cmd.Execute(); err != nil {
|
||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
|
@ -49,9 +49,9 @@ type AggregatorOptions struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewCommandStartAggregator provides a CLI handler for 'start master' command
|
// NewCommandStartAggregator provides a CLI handler for 'start master' command
|
||||||
func NewCommandStartAggregator(out, err io.Writer, stopCh <-chan struct{}) *cobra.Command {
|
// with a default AggregatorOptions.
|
||||||
o := NewDefaultOptions(out, err)
|
func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct{}) *cobra.Command {
|
||||||
|
o := *defaults
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Short: "Launch a API aggregator and proxy server",
|
Short: "Launch a API aggregator and proxy server",
|
||||||
Long: "Launch a API aggregator and proxy server",
|
Long: "Launch a API aggregator and proxy server",
|
||||||
|
@ -37,7 +37,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stopCh := genericapiserver.SetupSignalHandler()
|
stopCh := genericapiserver.SetupSignalHandler()
|
||||||
cmd := server.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
|
options := server.NewWardleServerOptions(os.Stdout, os.Stderr)
|
||||||
|
cmd := server.NewCommandStartWardleServer(options, stopCh)
|
||||||
cmd.Flags().AddGoFlagSet(flag.CommandLine)
|
cmd.Flags().AddGoFlagSet(flag.CommandLine)
|
||||||
if err := cmd.Execute(); err != nil {
|
if err := cmd.Execute(); err != nil {
|
||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
|
@ -57,9 +57,9 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewCommandStartWardleServer provides a CLI handler for 'start master' command
|
// NewCommandStartWardleServer provides a CLI handler for 'start master' command
|
||||||
func NewCommandStartWardleServer(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
|
// with a default WardleServerOptions.
|
||||||
o := NewWardleServerOptions(out, errOut)
|
func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command {
|
||||||
|
o := *defaults
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Short: "Launch a wardle API server",
|
Short: "Launch a wardle API server",
|
||||||
Long: "Launch a wardle API server",
|
Long: "Launch a wardle API server",
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
|
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
@ -704,47 +705,41 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
|
||||||
kubeAPIServerOptions := options.NewServerRunOptions()
|
if err != nil {
|
||||||
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
t.Fatal(err)
|
||||||
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
}
|
||||||
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
|
|
||||||
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // TODO use protobuf?
|
|
||||||
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
|
|
||||||
kubeAPIServerOptions.Authorization.Mode = "RBAC"
|
|
||||||
|
|
||||||
// always get a fresh port in case something claimed the old one
|
kubeAPIServerOptions := options.NewServerRunOptions()
|
||||||
kubePort, err := framework.FindFreeLocalPort()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeAPIServerOptions.SecureServing.BindPort = kubePort
|
kubeAPIServerOptions.SecureServing.Listener = listener
|
||||||
|
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
||||||
|
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
|
||||||
|
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // TODO use protobuf?
|
||||||
|
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
|
||||||
|
kubeAPIServerOptions.Authorization.Mode = "RBAC"
|
||||||
|
|
||||||
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
|
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
|
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeAPIServerConfig.ExtraConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
|
kubeAPIServerConfig.ExtraConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
|
||||||
|
|
||||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, versionedInformers)
|
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, versionedInformers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
|
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
|
||||||
storageConfigValue.Store(kubeAPIServerOptions.Etcd.StorageConfig)
|
storageConfigValue.Store(kubeAPIServerOptions.Etcd.StorageConfig)
|
||||||
|
|
||||||
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
|
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
|
||||||
t.Log(err)
|
t.Fatal(err)
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -22,9 +22,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -34,6 +34,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
|
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||||
client "k8s.io/client-go/kubernetes"
|
client "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
@ -90,52 +91,48 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
|
|
||||||
kubeClientConfigValue := atomic.Value{}
|
kubeClientConfigValue := atomic.Value{}
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
|
||||||
// always get a fresh port in case something claimed the old one
|
if err != nil {
|
||||||
kubePort, err := framework.FindFreeLocalPort()
|
t.Fatal(err)
|
||||||
if err != nil {
|
}
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeAPIServerOptions := options.NewServerRunOptions()
|
kubeAPIServerOptions := options.NewServerRunOptions()
|
||||||
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
kubeAPIServerOptions.SecureServing.Listener = listener
|
||||||
kubeAPIServerOptions.SecureServing.BindPort = kubePort
|
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
||||||
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
||||||
kubeAPIServerOptions.InsecureServing.BindPort = 0
|
kubeAPIServerOptions.InsecureServing.BindPort = 0
|
||||||
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
|
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
|
||||||
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
|
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
|
||||||
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
|
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
|
||||||
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
|
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
|
||||||
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}
|
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}
|
||||||
kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"}
|
kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"}
|
||||||
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
|
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
|
||||||
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
|
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
|
||||||
kubeAPIServerOptions.Authorization.Mode = "RBAC"
|
kubeAPIServerOptions.Authorization.Mode = "RBAC"
|
||||||
|
|
||||||
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
|
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
|
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Adjust the loopback config for external use (external server name and CA)
|
// Adjust the loopback config for external use (external server name and CA)
|
||||||
kubeAPIServerClientConfig := rest.CopyConfig(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
|
kubeAPIServerClientConfig := rest.CopyConfig(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
|
||||||
kubeAPIServerClientConfig.CAFile = path.Join(certDir, "apiserver.crt")
|
kubeAPIServerClientConfig.CAFile = path.Join(certDir, "apiserver.crt")
|
||||||
kubeAPIServerClientConfig.CAData = nil
|
kubeAPIServerClientConfig.CAData = nil
|
||||||
kubeAPIServerClientConfig.ServerName = ""
|
kubeAPIServerClientConfig.ServerName = ""
|
||||||
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
|
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
|
||||||
|
|
||||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, versionedInformers)
|
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, versionedInformers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
|
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
|
||||||
t.Log(err)
|
t.Fatal(err)
|
||||||
}
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -154,9 +151,13 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
t.Log(err)
|
t.Log(err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
|
|
||||||
|
healthStatus := 0
|
||||||
|
kubeClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
||||||
|
if healthStatus != http.StatusOK {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -177,32 +178,30 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
|
|
||||||
// start the wardle server to prove we can aggregate it
|
// start the wardle server to prove we can aggregate it
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
listener, port, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
|
||||||
// always get a fresh port in case something claimed the old one
|
if err != nil {
|
||||||
wardlePortInt, err := framework.FindFreeLocalPort()
|
t.Fatal(err)
|
||||||
if err != nil {
|
}
|
||||||
t.Fatal(err)
|
atomic.StoreInt32(wardlePort, int32(port))
|
||||||
}
|
|
||||||
atomic.StoreInt32(wardlePort, int32(wardlePortInt))
|
o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr)
|
||||||
wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
|
o.RecommendedOptions.SecureServing.Listener = listener
|
||||||
wardleCmd.SetArgs([]string{
|
o.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
||||||
"--bind-address", "127.0.0.1",
|
wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh)
|
||||||
"--secure-port", strconv.Itoa(wardlePortInt),
|
wardleCmd.SetArgs([]string{
|
||||||
"--requestheader-username-headers=X-Remote-User",
|
"--requestheader-username-headers=X-Remote-User",
|
||||||
"--requestheader-group-headers=X-Remote-Group",
|
"--requestheader-group-headers=X-Remote-Group",
|
||||||
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
|
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
|
||||||
"--requestheader-client-ca-file=" + proxyCACertFile.Name(),
|
"--requestheader-client-ca-file=" + proxyCACertFile.Name(),
|
||||||
"--requestheader-allowed-names=kube-aggregator",
|
"--requestheader-allowed-names=kube-aggregator",
|
||||||
"--authentication-kubeconfig", kubeconfigFile.Name(),
|
"--authentication-kubeconfig", kubeconfigFile.Name(),
|
||||||
"--authorization-kubeconfig", kubeconfigFile.Name(),
|
"--authorization-kubeconfig", kubeconfigFile.Name(),
|
||||||
"--etcd-servers", framework.GetEtcdURL(),
|
"--etcd-servers", framework.GetEtcdURL(),
|
||||||
"--cert-dir", wardleCertDir,
|
"--cert-dir", wardleCertDir,
|
||||||
"--kubeconfig", kubeconfigFile.Name(),
|
"--kubeconfig", kubeconfigFile.Name(),
|
||||||
})
|
})
|
||||||
if err := wardleCmd.Execute(); err != nil {
|
if err := wardleCmd.Execute(); err != nil {
|
||||||
t.Log(err)
|
t.Fatal(err)
|
||||||
}
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -220,8 +219,9 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
t.Log(err)
|
t.Log(err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if _, err := wardleClient.Discovery().ServerVersion(); err != nil {
|
healthStatus := 0
|
||||||
t.Log(err)
|
wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
||||||
|
if healthStatus != http.StatusOK {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -255,30 +255,29 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
aggregatorPort := new(int32)
|
aggregatorPort := new(int32)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
listener, port, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
|
||||||
// always get a fresh port in case something claimed the old one
|
if err != nil {
|
||||||
aggregatorPortInt, err := framework.FindFreeLocalPort()
|
t.Fatal(err)
|
||||||
if err != nil {
|
}
|
||||||
t.Fatal(err)
|
atomic.StoreInt32(aggregatorPort, int32(port))
|
||||||
}
|
|
||||||
atomic.StoreInt32(aggregatorPort, int32(aggregatorPortInt))
|
o := kubeaggregatorserver.NewDefaultOptions(os.Stdout, os.Stderr)
|
||||||
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh)
|
o.RecommendedOptions.SecureServing.Listener = listener
|
||||||
aggregatorCmd.SetArgs([]string{
|
o.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
||||||
"--bind-address", "127.0.0.1",
|
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(o, stopCh)
|
||||||
"--secure-port", strconv.Itoa(aggregatorPortInt),
|
aggregatorCmd.SetArgs([]string{
|
||||||
"--requestheader-username-headers", "",
|
"--requestheader-username-headers", "",
|
||||||
"--proxy-client-cert-file", proxyClientCertFile.Name(),
|
"--proxy-client-cert-file", proxyClientCertFile.Name(),
|
||||||
"--proxy-client-key-file", proxyClientKeyFile.Name(),
|
"--proxy-client-key-file", proxyClientKeyFile.Name(),
|
||||||
"--kubeconfig", kubeconfigFile.Name(),
|
"--kubeconfig", kubeconfigFile.Name(),
|
||||||
"--authentication-kubeconfig", kubeconfigFile.Name(),
|
"--authentication-kubeconfig", kubeconfigFile.Name(),
|
||||||
"--authorization-kubeconfig", kubeconfigFile.Name(),
|
"--authorization-kubeconfig", kubeconfigFile.Name(),
|
||||||
"--etcd-servers", framework.GetEtcdURL(),
|
"--etcd-servers", framework.GetEtcdURL(),
|
||||||
"--cert-dir", aggregatorCertDir,
|
"--cert-dir", aggregatorCertDir,
|
||||||
})
|
})
|
||||||
if err := aggregatorCmd.Execute(); err != nil {
|
|
||||||
t.Log(err)
|
if err := aggregatorCmd.Execute(); err != nil {
|
||||||
}
|
t.Fatal(err)
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -295,7 +294,9 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
// this happens if we race the API server for writing the cert
|
// this happens if we race the API server for writing the cert
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if _, err := aggregatorDiscoveryClient.Discovery().ServerVersion(); err != nil {
|
healthStatus := 0
|
||||||
|
aggregatorDiscoveryClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
||||||
|
if healthStatus != http.StatusOK {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -304,7 +305,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we're finally ready to test. These are what's run by defautl now
|
// now we're finally ready to test. These are what's run by default now
|
||||||
testAPIGroupList(t, wardleClient.Discovery().RESTClient())
|
testAPIGroupList(t, wardleClient.Discovery().RESTClient())
|
||||||
testAPIGroup(t, wardleClient.Discovery().RESTClient())
|
testAPIGroup(t, wardleClient.Discovery().RESTClient())
|
||||||
testAPIResourceList(t, wardleClient.Discovery().RESTClient())
|
testAPIResourceList(t, wardleClient.Discovery().RESTClient())
|
||||||
@ -342,8 +343,8 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
_, err = aggregatorClient.ApiregistrationV1beta1().APIServices().Create(&apiregistrationv1beta1.APIService{
|
_, err = aggregatorClient.ApiregistrationV1beta1().APIServices().Create(&apiregistrationv1beta1.APIService{
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
||||||
Spec: apiregistrationv1beta1.APIServiceSpec{
|
Spec: apiregistrationv1beta1.APIServiceSpec{
|
||||||
// register this as a loca service so it doesn't try to lookup the default kubernetes service
|
// register this as a local service so it doesn't try to lookup the default kubernetes service
|
||||||
// which will have an unroutable IP address since its fake.
|
// which will have an unroutable IP address since it's fake.
|
||||||
Group: "",
|
Group: "",
|
||||||
Version: "v1",
|
Version: "v1",
|
||||||
GroupPriorityMinimum: 100,
|
GroupPriorityMinimum: 100,
|
||||||
@ -460,8 +461,3 @@ func testAPIResourceList(t *testing.T, client rest.Interface) {
|
|||||||
assert.Equal(t, "flunders", apiResourceList.APIResources[1].Name)
|
assert.Equal(t, "flunders", apiResourceList.APIResources[1].Name)
|
||||||
assert.True(t, apiResourceList.APIResources[1].Namespaced)
|
assert.True(t, apiResourceList.APIResources[1].Namespaced)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
policyCachePollInterval = 100 * time.Millisecond
|
|
||||||
policyCachePollTimeout = 5 * time.Second
|
|
||||||
)
|
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-openapi/spec"
|
"github.com/go-openapi/spec"
|
||||||
@ -328,28 +327,6 @@ func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, mast
|
|||||||
return startMasterOrDie(masterConfig, s, masterReceiver)
|
return startMasterOrDie(masterConfig, s, masterReceiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindFreeLocalPort returns the number of an available port number on
|
|
||||||
// the loopback interface. Useful for determining the port to launch
|
|
||||||
// a server on. Error handling required - there is a non-zero chance
|
|
||||||
// that the returned port number will be bound by another process
|
|
||||||
// after this function returns.
|
|
||||||
func FindFreeLocalPort() (int, error) {
|
|
||||||
l, err := net.Listen("tcp", ":0")
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
defer l.Close()
|
|
||||||
_, portStr, err := net.SplitHostPort(l.Addr().String())
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
port, err := strconv.Atoi(portStr)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return port, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix.
|
// SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix.
|
||||||
func SharedEtcd() *storagebackend.Config {
|
func SharedEtcd() *storagebackend.Config {
|
||||||
cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New(), "registry"), nil)
|
cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New(), "registry"), nil)
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
|
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||||
client "k8s.io/client-go/kubernetes"
|
client "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||||
@ -44,17 +45,17 @@ func runBasicSecureAPIServer(t *testing.T, ciphers []string) (uint32, error) {
|
|||||||
var kubePort uint32
|
var kubePort uint32
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// always get a fresh port in case something claimed the old one
|
listener, port, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
|
||||||
freePort, err := framework.FindFreeLocalPort()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.StoreUint32(&kubePort, uint32(freePort))
|
atomic.StoreUint32(&kubePort, uint32(port))
|
||||||
|
|
||||||
kubeAPIServerOptions := options.NewServerRunOptions()
|
kubeAPIServerOptions := options.NewServerRunOptions()
|
||||||
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
|
||||||
kubeAPIServerOptions.SecureServing.BindPort = freePort
|
kubeAPIServerOptions.SecureServing.BindPort = port
|
||||||
|
kubeAPIServerOptions.SecureServing.Listener = listener
|
||||||
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
||||||
kubeAPIServerOptions.SecureServing.CipherSuites = ciphers
|
kubeAPIServerOptions.SecureServing.CipherSuites = ciphers
|
||||||
kubeAPIServerOptions.InsecureServing.BindPort = 0
|
kubeAPIServerOptions.InsecureServing.BindPort = 0
|
||||||
|
Loading…
Reference in New Issue
Block a user