Merge pull request #42059 from deads2k/agg-19-integration-test

Automatic merge from submit-queue (batch tested with PRs 35094, 42095, 42059, 42143, 41944)

add aggregation integration test

Wires up an integration test which runs a full kube-apiserver, the wardle server, and the kube-aggregator and creates the APIservice object for the wardle server.  Without services and DNS the aggregator doesn't proxy, but it does ensure we don't have an obvious panic or bring up failure.

@sttts @ncdc
This commit is contained in:
Kubernetes Submit Queue 2017-02-28 09:20:16 -08:00 committed by GitHub
commit b26fb689f6
13 changed files with 477 additions and 238 deletions

View File

@ -65,6 +65,7 @@ go_library(
"//plugin/cmd/kube-scheduler/app:go_default_library",
"//plugin/cmd/kube-scheduler/app/options:go_default_library",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/server/healthz",
"//vendor:k8s.io/apiserver/pkg/util/flag",
"//vendor:k8s.io/apiserver/pkg/util/logs",

View File

@ -19,6 +19,7 @@ package main
import (
"os"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kube-aggregator/pkg/cmd/server"
)
@ -39,7 +40,7 @@ func NewKubeAggregator() *Server {
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(); err != nil {
if err := o.RunAggregator(wait.NeverStop); err != nil {
return err
}
return nil

View File

@ -85,26 +85,47 @@ cluster's shared state through which all other components interact.`,
// Run runs the specified APIServer. This should never exit.
func Run(s *options.ServerRunOptions) error {
config, sharedInformers, err := BuildMasterConfig(s)
if err != nil {
return err
}
return RunServer(config, sharedInformers, wait.NeverStop)
}
// RunServer uses the provided config and shared informers to run the apiserver. It does not return.
func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
m, err := config.Complete().New()
if err != nil {
return err
}
sharedInformers.Start(stopCh)
return m.GenericAPIServer.PrepareRun().Run(stopCh)
}
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
// set defaults
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
return err
return nil, nil, err
}
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return fmt.Errorf("error determining service IP ranges: %v", err)
return nil, nil, fmt.Errorf("error determining service IP ranges: %v", err)
}
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
return nil, nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
return fmt.Errorf("error setting the external host value: %v", err)
return nil, nil, fmt.Errorf("error setting the external host value: %v", err)
}
s.Authentication.ApplyAuthorization(s.Authorization)
// validate options
if errs := s.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
return nil, nil, utilerrors.NewAggregate(errs)
}
// create config from options
@ -112,22 +133,22 @@ func Run(s *options.ServerRunOptions) error {
WithSerializer(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.InsecureServing.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.Audit.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.Features.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
// Use protobufs for self-communication.
@ -155,7 +176,7 @@ func Run(s *options.ServerRunOptions) error {
var installSSHKey tunneler.InstallSSHKey
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
if err != nil {
return fmt.Errorf("cloud provider could not be initialized: %v", err)
return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
@ -163,10 +184,10 @@ func Run(s *options.ServerRunOptions) error {
}
}
if s.KubeletConfig.Port == 0 {
return fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
}
if s.KubeletConfig.ReadOnlyPort == 0 {
return fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
}
// Set up the nodeTunneler
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
@ -211,7 +232,7 @@ func Run(s *options.ServerRunOptions) error {
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
if err != nil {
return fmt.Errorf("error generating storage version map: %s", err)
return nil, nil, fmt.Errorf("error generating storage version map: %s", err)
}
storageFactory, err := kubeapiserver.NewStorageFactory(
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
@ -220,7 +241,7 @@ func Run(s *options.ServerRunOptions) error {
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
if err != nil {
return fmt.Errorf("error in initializing storage factory: %s", err)
return nil, nil, fmt.Errorf("error in initializing storage factory: %s", err)
}
for _, override := range s.Etcd.EtcdServersOverrides {
tokens := strings.Split(override, "#")
@ -257,7 +278,7 @@ func Run(s *options.ServerRunOptions) error {
// go directly to etcd to avoid recursive auth insanity
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
if err != nil {
return fmt.Errorf("unable to get serviceaccounts storage: %v", err)
return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err)
}
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
}
@ -266,7 +287,7 @@ func Run(s *options.ServerRunOptions) error {
if err != nil {
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
if len(kubeAPIVersions) == 0 {
return fmt.Errorf("failed to create clientset: %v", err)
return nil, nil, fmt.Errorf("failed to create clientset: %v", err)
}
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
@ -289,24 +310,24 @@ func Run(s *options.ServerRunOptions) error {
apiAuthenticator, securityDefinitions, err := authenticatorConfig.New()
if err != nil {
return fmt.Errorf("invalid authentication config: %v", err)
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
apiAuthorizer, err := authorizationConfig.New()
if err != nil {
return fmt.Errorf("invalid Authorization Config: %v", err)
return nil, nil, fmt.Errorf("invalid Authorization Config: %v", err)
}
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
return nil, nil, fmt.Errorf("failed to read plugin config: %v", err)
}
admissionController, err := admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
if err != nil {
return fmt.Errorf("failed to initialize plugins: %v", err)
return nil, nil, fmt.Errorf("failed to initialize plugins: %v", err)
}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
@ -331,16 +352,16 @@ func Run(s *options.ServerRunOptions) error {
)
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return err
return nil, nil, err
}
clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
if err != nil {
return err
return nil, nil, err
}
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
if err != nil {
return err
return nil, nil, err
}
config := &master.Config{
GenericConfig: genericConfig,
@ -381,14 +402,7 @@ func Run(s *options.ServerRunOptions) error {
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
}
m, err := config.Complete().New()
if err != nil {
return err
}
sharedInformers.Start(wait.NeverStop)
m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
return nil
return config, sharedInformers, nil
}
func readCAorNil(file string) ([]byte, error) {

View File

@ -213,8 +213,7 @@ func Run(s *options.ServerRunOptions) error {
installAutoscalingAPIs(m, genericConfig.RESTOptionsGetter)
sharedInformers.Start(wait.NeverStop)
m.PrepareRun().Run(wait.NeverStop)
return nil
return m.PrepareRun().Run(wait.NeverStop)
}
// PostProcessSpec adds removed definitions for backward compatibility

View File

@ -189,16 +189,16 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the http servers (secure and insecure). It only returns if stopCh is closed
// or one of the ports cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) {
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
if s.SecureServingInfo != nil && s.Handler != nil {
if err := s.serveSecurely(stopCh); err != nil {
glog.Fatal(err)
return err
}
}
if s.InsecureServingInfo != nil && s.InsecureHandler != nil {
if err := s.serveInsecurely(stopCh); err != nil {
glog.Fatal(err)
return err
}
}
@ -210,6 +210,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) {
}
<-stopCh
return nil
}
// EffectiveSecurePort returns the secure port we bound to.

View File

@ -448,6 +448,8 @@ NextTest:
}
stopCh := make(chan struct{})
func() {
defer close(stopCh)
// launch server
config := setUp(t)
@ -472,14 +474,14 @@ NextTest:
config.LoopbackClientConfig = &restclient.Config{}
if err := secureOptions.ApplyTo(&config); err != nil {
t.Errorf("%q - failed applying the SecureServingOptions: %v", title, err)
continue NextTest
return
}
config.InsecureServingInfo = nil
s, err := config.Complete().New()
if err != nil {
t.Errorf("%q - failed creating the server: %v", title, err)
continue NextTest
return
}
// patch in a 0-port to enable auto port allocation
@ -492,7 +494,11 @@ NextTest:
return nil
})
preparedServer := s.PrepareRun()
go preparedServer.Run(stopCh)
go func() {
if err := preparedServer.Run(stopCh); err != nil {
t.Fatal(err)
}
}()
// load ca certificates into a pool
roots := x509.NewCertPool()
@ -512,7 +518,7 @@ NextTest:
})
if err != nil {
t.Errorf("%q - failed to connect: %v", title, err)
continue NextTest
return
}
// check returned server certificate
@ -537,25 +543,26 @@ NextTest:
if err == nil {
t.Errorf("%q - expected error creating loopback client config", title)
}
continue NextTest
return
}
if err != nil {
t.Errorf("%q - failed creating loopback client config: %v", title, err)
continue NextTest
return
}
client, err := discovery.NewDiscoveryClientForConfig(s.LoopbackClientConfig)
if err != nil {
t.Errorf("%q - failed to create loopback client: %v", title, err)
continue NextTest
return
}
got, err := client.ServerVersion()
if err != nil {
t.Errorf("%q - failed to connect with loopback client: %v", title, err)
continue NextTest
return
}
if expected := &v; !reflect.DeepEqual(got, expected) {
t.Errorf("%q - loopback client didn't get correct version info: expected=%v got=%v", title, expected, got)
}
}()
}
}

View File

@ -21,6 +21,7 @@ import (
"os"
"runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kube-aggregator/pkg/cmd/server"
@ -40,7 +41,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr)
cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr, wait.NeverStop)
cmd.Flags().AddGoFlagSet(flag.CommandLine)
if err := cmd.Execute(); err != nil {
panic(err)

View File

@ -22,7 +22,6 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
@ -106,7 +105,7 @@ func (c *Config) SkipComplete() completedConfig {
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) New() (*APIAggregator, error) {
func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
informerFactory := informers.NewSharedInformerFactory(
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
@ -154,12 +153,12 @@ func (c completedConfig) New() (*APIAggregator, error) {
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s)
s.GenericAPIServer.AddPostStartHook("start-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(wait.NeverStop)
kubeInformers.Start(wait.NeverStop)
informerFactory.Start(stopCh)
kubeInformers.Start(stopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
apiserviceRegistrationController.Run(wait.NeverStop)
apiserviceRegistrationController.Run(stopCh)
return nil
})

View File

@ -25,7 +25,6 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
@ -56,7 +55,7 @@ type AggregatorOptions struct {
}
// NewCommandStartAggregator provides a CLI handler for 'start master' command
func NewCommandStartAggregator(out, err io.Writer) *cobra.Command {
func NewCommandStartAggregator(out, err io.Writer, stopCh <-chan struct{}) *cobra.Command {
o := NewDefaultOptions(out, err)
cmd := &cobra.Command{
@ -69,7 +68,7 @@ func NewCommandStartAggregator(out, err io.Writer) *cobra.Command {
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(); err != nil {
if err := o.RunAggregator(stopCh); err != nil {
return err
}
return nil
@ -110,7 +109,7 @@ func (o *AggregatorOptions) Complete() error {
return nil
}
func (o AggregatorOptions) RunAggregator() error {
func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
@ -161,11 +160,9 @@ func (o AggregatorOptions) RunAggregator() error {
return err
}
server, err := config.Complete().New()
server, err := config.Complete().New(stopCh)
if err != nil {
return err
}
server.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
return nil
return server.GenericAPIServer.PrepareRun().Run(stopCh)
}

View File

@ -19,6 +19,7 @@ package server
import (
"fmt"
"io"
"net"
"github.com/spf13/cobra"
@ -85,7 +86,7 @@ func (o *WardleServerOptions) Complete() error {
func (o WardleServerOptions) Config() (*apiserver.Config, error) {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil {
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", net.ParseIP("127.0.0.1")); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
@ -110,7 +111,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
if err != nil {
return err
}
server.GenericAPIServer.PrepareRun().Run(stopCh)
return nil
return server.GenericAPIServer.PrepareRun().Run(stopCh)
}

View File

@ -12,13 +12,20 @@ go_test(
srcs = ["apiserver_test.go"],
tags = ["automanaged"],
deps = [
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/clientcmd",
"//vendor:k8s.io/client-go/tools/clientcmd/api",
"//vendor:k8s.io/client-go/util/cert",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset",
"//vendor:k8s.io/kube-aggregator/pkg/cmd/server",
"//vendor:k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1",
"//vendor:k8s.io/sample-apiserver/pkg/cmd/server",
],

View File

@ -17,29 +17,37 @@ limitations under the License.
package apiserver
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net"
"os"
"path"
"strconv"
"strings"
"testing"
"time"
"github.com/golang/glog"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/util/cert"
apiregistrationv1alpha1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
kubeaggregatorserver "k8s.io/kube-aggregator/pkg/cmd/server"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
"k8s.io/sample-apiserver/pkg/cmd/server"
sampleserver "k8s.io/sample-apiserver/pkg/cmd/server"
)
const securePort = "6444"
var groupVersion = v1alpha1.SchemeGroupVersion
var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{
@ -47,41 +55,277 @@ var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{
Version: groupVersion.Version,
}
func TestRunServer(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
func localPort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer l.Close()
addr := strings.Split(l.Addr().String(), ":")
port, err := strconv.Atoi(addr[len(addr)-1])
if err != nil {
return 0, err
}
return port, nil
}
adminKubeConfig := createKubeConfig(masterConfig.GenericConfig.LoopbackClientConfig)
func TestAggregatedAPIServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
kubePort, err := localPort()
if err != nil {
t.Fatal(err)
}
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
defer os.RemoveAll(certDir)
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
proxySigningKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
t.Fatal(err)
}
proxyCACertFile, _ := ioutil.TempFile(certDir, "proxy-ca.crt")
if err := ioutil.WriteFile(proxyCACertFile.Name(), cert.EncodeCertPEM(proxySigningCert), 0644); err != nil {
t.Fatal(err)
}
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}
kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"}
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
kubeAPIServerOptions.Authorization.Mode = "RBAC"
config, sharedInformers, err := app.BuildMasterConfig(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
go func() {
for {
if err := app.RunServer(config, sharedInformers, stopCh); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
// just use json because everyone speaks it
config.GenericConfig.LoopbackClientConfig.ContentType = ""
config.GenericConfig.LoopbackClientConfig.AcceptContentTypes = ""
kubeClient := client.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
adminKubeConfig := createKubeConfig(config.GenericConfig.LoopbackClientConfig)
kubeconfigFile, _ := ioutil.TempFile("", "")
defer os.Remove(kubeconfigFile.Name())
clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name())
// Avoid default cert-dir of /var/run/kubernetes to allow this to run on darwin
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
defer os.Remove(certDir)
stopCh := make(chan struct{})
defer close(stopCh)
cmd := server.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
cmd.SetArgs([]string{
"--secure-port", securePort,
"--requestheader-username-headers", "",
// start the wardle server to prove we can aggregate it
wardlePort, err := localPort()
if err != nil {
t.Fatal(err)
}
wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
defer os.RemoveAll(wardleCertDir)
wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
wardleCmd.SetArgs([]string{
"--bind-address", "127.0.0.1",
"--secure-port", strconv.Itoa(wardlePort),
"--requestheader-username-headers=X-Remote-User",
"--requestheader-group-headers=X-Remote-Group",
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
"--requestheader-client-ca-file=" + proxyCACertFile.Name(),
"--requestheader-allowed-names=kube-aggregator",
"--authentication-kubeconfig", kubeconfigFile.Name(),
"--authorization-kubeconfig", kubeconfigFile.Name(),
"--etcd-servers", framework.GetEtcdURLFromEnv(),
"--cert-dir", certDir,
"--cert-dir", wardleCertDir,
})
go cmd.Execute()
go func() {
for {
if err := wardleCmd.Execute(); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
serverLocation := fmt.Sprintf("https://localhost:%s", securePort)
if err := waitForApiserverUp(serverLocation); err != nil {
t.Fatalf("%v", err)
wardleClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig)
wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
wardleClientConfig.CAData = nil
wardleClientConfig.ServerName = ""
wardleClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken
var wardleClient client.Interface
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
wardleClient, err = client.NewForConfig(wardleClientConfig)
if err != nil {
// this happens if we race the API server for writing the cert
t.Log(err)
return false, nil
}
if _, err := wardleClient.Discovery().ServerVersion(); err != nil {
t.Log(err)
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
testAPIGroupList(t, serverLocation)
testAPIGroup(t, serverLocation)
testAPIResourceList(t, serverLocation)
// start the aggregator
aggregatorPort, err := localPort()
if err != nil {
t.Fatal(err)
}
aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator")
defer os.RemoveAll(aggregatorCertDir)
proxyClientKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxyClientCert, err := cert.NewSignedCert(
cert.Config{
CommonName: "kube-aggregator",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
},
proxyClientKey, proxySigningCert, proxySigningKey,
)
proxyClientCertFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.crt")
proxyClientKeyFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.key")
if err := ioutil.WriteFile(proxyClientCertFile.Name(), cert.EncodeCertPEM(proxyClientCert), 0600); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(proxyClientKeyFile.Name(), cert.EncodePrivateKeyPEM(proxyClientKey), 0644); err != nil {
t.Fatal(err)
}
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh)
aggregatorCmd.SetArgs([]string{
"--bind-address", "127.0.0.1",
"--secure-port", strconv.Itoa(aggregatorPort),
"--requestheader-username-headers", "",
"--proxy-client-cert-file", proxyClientCertFile.Name(),
"--proxy-client-key-file", proxyClientKeyFile.Name(),
"--core-kubeconfig", kubeconfigFile.Name(),
"--authentication-kubeconfig", kubeconfigFile.Name(),
"--authorization-kubeconfig", kubeconfigFile.Name(),
"--etcd-servers", framework.GetEtcdURLFromEnv(),
"--cert-dir", aggregatorCertDir,
})
go func() {
for {
if err := aggregatorCmd.Execute(); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
aggregatorClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig)
aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", aggregatorPort)
aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt")
aggregatorClientConfig.CAData = nil
aggregatorClientConfig.ServerName = ""
aggregatorClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken
var aggregatorDiscoveryClient client.Interface
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig)
if err != nil {
// this happens if we race the API server for writing the cert
return false, nil
}
if _, err := aggregatorDiscoveryClient.Discovery().ServerVersion(); err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// now we're finally ready to test. These are what's run by defautl now
testAPIGroupList(t, wardleClient.Discovery().RESTClient())
testAPIGroup(t, wardleClient.Discovery().RESTClient())
testAPIResourceList(t, wardleClient.Discovery().RESTClient())
wardleCA, err := ioutil.ReadFile(wardleClientConfig.CAFile)
if err != nil {
t.Fatal(err)
}
aggregatorClient := aggregatorclient.NewForConfigOrDie(aggregatorClientConfig)
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.k8s.io"},
Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: apiregistrationv1alpha1.ServiceReference{
Namespace: "kube-wardle",
Name: "api",
},
Group: "wardle.k8s.io",
Version: "v1alpha1",
CABundle: wardleCA,
Priority: 200,
},
})
if err != nil {
t.Fatal(err)
}
// this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery
// (the service is missing), we don't have an external signal.
time.Sleep(100 * time.Millisecond)
if _, err := aggregatorDiscoveryClient.Discovery().ServerResources(); err != nil {
t.Fatal(err)
}
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: apiregistrationv1alpha1.ServiceReference{
Namespace: "default",
Name: "kubernetes",
},
Group: "",
Version: "v1",
CABundle: config.GenericConfig.LoopbackClientConfig.CAData,
Priority: 100,
},
})
if err != nil {
t.Fatal(err)
}
// this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery
// (the service is missing), we don't have an external signal.
time.Sleep(100 * time.Millisecond)
_, err = aggregatorDiscoveryClient.Discovery().ServerResources()
if err != nil && !strings.Contains(err.Error(), "lookup kubernetes.default.svc") {
t.Fatal(err)
}
// TODO figure out how to turn on enough of services and dns to run more
}
func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
@ -124,46 +368,12 @@ func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
return config
}
func waitForApiserverUp(serverLocation string) error {
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(5 * time.Second) {
glog.Errorf("Waiting for : %#v", serverLocation)
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
_, err := client.Get(serverLocation)
if err == nil {
return nil
}
}
return fmt.Errorf("waiting for apiserver timed out")
func readResponse(client rest.Interface, location string) ([]byte, error) {
return client.Get().AbsPath(location).DoRaw()
}
func readResponse(serverURL string) ([]byte, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
response, err := client.Get(serverURL)
if err != nil {
glog.Errorf("http get err code : %#v", err)
return nil, fmt.Errorf("Error in fetching %s: %v", serverURL, err)
}
defer response.Body.Close()
glog.Errorf("http get response code : %#v", response.StatusCode)
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d for URL: %s, expected status: %d", response.StatusCode, serverURL, http.StatusOK)
}
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("Error reading response from %s: %v", serverURL, err)
}
return contents, nil
}
func testAPIGroupList(t *testing.T, serverLocation string) {
serverURL := serverLocation + "/apis"
contents, err := readResponse(serverURL)
func testAPIGroupList(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis")
if err != nil {
t.Fatalf("%v", err)
}
@ -171,7 +381,7 @@ func testAPIGroupList(t *testing.T, serverLocation string) {
var apiGroupList metav1.APIGroupList
err = json.Unmarshal(contents, &apiGroupList)
if err != nil {
t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err)
t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis", err)
}
assert.Equal(t, 1, len(apiGroupList.Groups))
assert.Equal(t, groupVersion.Group, apiGroupList.Groups[0].Name)
@ -180,9 +390,8 @@ func testAPIGroupList(t *testing.T, serverLocation string) {
assert.Equal(t, groupVersionForDiscovery, apiGroupList.Groups[0].PreferredVersion)
}
func testAPIGroup(t *testing.T, serverLocation string) {
serverURL := serverLocation + "/apis/wardle.k8s.io"
contents, err := readResponse(serverURL)
func testAPIGroup(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis/wardle.k8s.io")
if err != nil {
t.Fatalf("%v", err)
}
@ -190,7 +399,7 @@ func testAPIGroup(t *testing.T, serverLocation string) {
var apiGroup metav1.APIGroup
err = json.Unmarshal(contents, &apiGroup)
if err != nil {
t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err)
t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.k8s.io", err)
}
assert.Equal(t, groupVersion.Group, apiGroup.Name)
assert.Equal(t, 1, len(apiGroup.Versions))
@ -199,9 +408,8 @@ func testAPIGroup(t *testing.T, serverLocation string) {
assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0])
}
func testAPIResourceList(t *testing.T, serverLocation string) {
serverURL := serverLocation + "/apis/wardle.k8s.io/v1alpha1"
contents, err := readResponse(serverURL)
func testAPIResourceList(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis/wardle.k8s.io/v1alpha1")
if err != nil {
t.Fatalf("%v", err)
}
@ -209,10 +417,15 @@ func testAPIResourceList(t *testing.T, serverLocation string) {
var apiResourceList metav1.APIResourceList
err = json.Unmarshal(contents, &apiResourceList)
if err != nil {
t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err)
t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.k8s.io/v1alpha1", err)
}
assert.Equal(t, groupVersion.String(), apiResourceList.GroupVersion)
assert.Equal(t, 1, len(apiResourceList.APIResources))
assert.Equal(t, "flunders", apiResourceList.APIResources[0].Name)
assert.True(t, apiResourceList.APIResources[0].Namespaced)
}
const (
policyCachePollInterval = 100 * time.Millisecond
policyCachePollTimeout = 5 * time.Second
)

2
vendor/BUILD vendored
View File

@ -15827,6 +15827,7 @@ go_library(
srcs = ["k8s.io/kube-aggregator/main.go"],
tags = ["automanaged"],
deps = [
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/util/logs",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/install",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/validation",
@ -16298,7 +16299,6 @@ go_library(
"//vendor:github.com/spf13/cobra",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/server/filters",
"//vendor:k8s.io/apiserver/pkg/server/options",