Merge pull request #42278 from marun/fed-api-fixture

Automatic merge from submit-queue (batch tested with PRs 42728, 42278)

[Federation] Create integration test fixture for api

This PR factors a reusable fixture for the federation api server out of the existing integration test.

Targets #40705

cc: @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-03-09 05:45:33 -08:00 committed by GitHub
commit cf732613e3
13 changed files with 319 additions and 99 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app/options"
)
@ -30,7 +31,7 @@ func NewFederationAPIServer() *Server {
SimpleUsage: "federation-apiserver",
Long: "The API entrypoint for the federation control plane",
Run: func(_ *Server, args []string) error {
return app.Run(s)
return app.Run(s, wait.NeverStop)
},
}
s.AddFlags(hks.Flags())

View File

@ -23,6 +23,7 @@ go_library(
"//federation/cmd/federation-apiserver/app/options:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/util/flag",
"//vendor:k8s.io/apiserver/pkg/util/logs",
],

View File

@ -24,6 +24,7 @@ import (
"os"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app"
@ -45,7 +46,7 @@ func main() {
verflag.PrintAndExitIfRequested()
if err := app.Run(s); err != nil {
if err := app.Run(s, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

View File

@ -67,7 +67,6 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/admission",
"//vendor:k8s.io/apiserver/pkg/registry/generic",
"//vendor:k8s.io/apiserver/pkg/registry/rest",

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
@ -67,8 +66,20 @@ cluster's shared state through which all other components interact.`,
return cmd
}
// Run runs the specified APIServer. This should never exit.
func Run(s *options.ServerRunOptions) error {
// Run runs the specified APIServer. It only returns if stopCh is closed
// or one of the ports cannot be listened on initially.
func Run(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
err := NonBlockingRun(s, stopCh)
if err != nil {
return err
}
<-stopCh
return nil
}
// NonBlockingRun runs the specified APIServer and configures it to
// stop with the given channel.
func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
// set defaults
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
return err
@ -223,8 +234,11 @@ func Run(s *options.ServerRunOptions) error {
// installBatchAPIs(m, genericConfig.RESTOptionsGetter)
// installAutoscalingAPIs(m, genericConfig.RESTOptionsGetter)
sharedInformers.Start(wait.NeverStop)
return m.PrepareRun().Run(wait.NeverStop)
err = m.PrepareRun().NonBlockingRun(stopCh)
if err == nil {
sharedInformers.Start(stopCh)
}
return err
}
// PostProcessSpec adds removed definitions for backward compatibility

View File

@ -190,18 +190,43 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the http servers (secure and insecure). It only returns if stopCh is closed
// or one of the ports cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
err := s.NonBlockingRun(stopCh)
if err != nil {
return err
}
<-stopCh
return nil
}
// NonBlockingRun spawns the http servers (secure and insecure). An error is
// returned if either of the ports cannot be listened on.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
if s.SecureServingInfo != nil && s.Handler != nil {
if err := s.serveSecurely(stopCh); err != nil {
if err := s.serveSecurely(internalStopCh); err != nil {
close(internalStopCh)
return err
}
}
if s.InsecureServingInfo != nil && s.InsecureHandler != nil {
if err := s.serveInsecurely(stopCh); err != nil {
if err := s.serveInsecurely(internalStopCh); err != nil {
close(internalStopCh)
return err
}
}
// Now that both listeners have bound successfully, it is the
// responsibility of the caller to close the provided channel to
// ensure cleanup.
go func() {
<-stopCh
close(internalStopCh)
}()
s.RunPostStartHooks()
// err == systemd.SdNotifyNoSocket when not running on a systemd system
@ -209,7 +234,6 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
<-stopCh
return nil
}

View File

@ -56,20 +56,6 @@ var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{
Version: groupVersion.Version,
}
func localPort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer l.Close()
addr := strings.Split(l.Addr().String(), ":")
port, err := strconv.Atoi(addr[len(addr)-1])
if err != nil {
return 0, err
}
return port, nil
}
func TestAggregatedAPIServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
@ -106,7 +92,7 @@ func TestAggregatedAPIServer(t *testing.T) {
go func() {
for {
// always get a fresh port in case something claimed the old one
kubePort, err := localPort()
kubePort, err := framework.FindFreeLocalPort()
if err != nil {
t.Fatal(err)
}
@ -179,7 +165,7 @@ func TestAggregatedAPIServer(t *testing.T) {
go func() {
for {
// always get a fresh port in case something claimed the old one
wardlePortInt, err := localPort()
wardlePortInt, err := framework.FindFreeLocalPort()
if err != nil {
t.Fatal(err)
}
@ -256,7 +242,7 @@ func TestAggregatedAPIServer(t *testing.T) {
go func() {
for {
// always get a fresh port in case something claimed the old one
aggregatorPortInt, err := localPort()
aggregatorPortInt, err := framework.FindFreeLocalPort()
if err != nil {
t.Fatal(err)
}

View File

@ -9,16 +9,15 @@ load(
go_test(
name = "go_default_test",
srcs = ["server_test.go"],
srcs = ["api_test.go"],
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/cmd/federation-apiserver/app:go_default_library",
"//federation/cmd/federation-apiserver/app/options:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/autoscaling/v1:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//test/integration/federation/framework:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
@ -34,6 +33,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//test/integration/federation/framework:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -14,33 +14,27 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package app
package federation
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
fed_v1b1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/v1"
autoscaling_v1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
batch_v1 "k8s.io/kubernetes/pkg/apis/batch/v1"
ext_v1b1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/test/integration/federation/framework"
)
var securePort = 6443 + 2
var insecurePort = 8080 + 2
var serverIP = fmt.Sprintf("http://localhost:%v", insecurePort)
var groupVersions = []schema.GroupVersion{
fed_v1b1.SchemeGroupVersion,
ext_v1b1.SchemeGroupVersion,
@ -48,42 +42,25 @@ var groupVersions = []schema.GroupVersion{
// autoscaling_v1.SchemeGroupVersion,
}
func TestRun(t *testing.T) {
certDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("Failed to create temporary certificate directory: %v", err)
}
defer os.RemoveAll(certDir)
type apiTestFunc func(t *testing.T, host string)
s := options.NewServerRunOptions()
s.SecureServing.ServingOptions.BindPort = securePort
s.InsecureServing.BindPort = insecurePort
s.Etcd.StorageConfig.ServerList = []string{"http://localhost:2379"}
s.SecureServing.ServerCert.CertDirectory = certDir
func TestFederationAPI(t *testing.T) {
f := &framework.FederationAPIFixture{}
f.Setup(t)
defer f.Teardown(t)
go func() {
if err := app.Run(s); err != nil {
t.Fatalf("Error in bringing up the server: %v", err)
}
}()
if err := waitForApiserverUp(); err != nil {
t.Fatalf("%v", err)
testCases := map[string]apiTestFunc{
"swaggerSpec": testSwaggerSpec,
"support": testSupport,
"apiGroupList": testAPIGroupList,
"apiGroup": testAPIGroup,
"apiResourceList": testAPIResourceList,
}
testSwaggerSpec(t)
testSupport(t)
testAPIGroupList(t)
testAPIGroup(t)
testAPIResourceList(t)
}
func waitForApiserverUp() error {
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
_, err := http.Get(serverIP)
if err == nil {
return nil
}
for testName, testFunc := range testCases {
t.Run(testName, func(t *testing.T) {
testFunc(t, f.Host)
})
}
return fmt.Errorf("waiting for apiserver timed out")
}
func readResponse(serverURL string) ([]byte, error) {
@ -102,16 +79,16 @@ func readResponse(serverURL string) ([]byte, error) {
return contents, nil
}
func testSwaggerSpec(t *testing.T) {
serverURL := serverIP + "/swaggerapi"
func testSwaggerSpec(t *testing.T, host string) {
serverURL := host + "/swaggerapi"
_, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
}
}
func testSupport(t *testing.T) {
serverURL := serverIP + "/version"
func testSupport(t *testing.T, host string) {
serverURL := host + "/version"
_, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -127,7 +104,7 @@ func findGroup(groups []metav1.APIGroup, groupName string) *metav1.APIGroup {
return nil
}
func testAPIGroupList(t *testing.T) {
func testAPIGroupList(t *testing.T, host string) {
groupVersionForDiscoveryMap := make(map[string]metav1.GroupVersionForDiscovery)
for _, groupVersion := range groupVersions {
groupVersionForDiscoveryMap[groupVersion.Group] = metav1.GroupVersionForDiscovery{
@ -136,7 +113,7 @@ func testAPIGroupList(t *testing.T) {
}
}
serverURL := serverIP + "/apis"
serverURL := host + "/apis"
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -158,9 +135,9 @@ func testAPIGroupList(t *testing.T) {
}
}
func testAPIGroup(t *testing.T) {
func testAPIGroup(t *testing.T, host string) {
for _, groupVersion := range groupVersions {
serverURL := serverIP + "/apis/" + groupVersion.Group
serverURL := host + "/apis/" + groupVersion.Group
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -183,11 +160,11 @@ func testAPIGroup(t *testing.T) {
assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0])
}
testCoreAPIGroup(t)
testCoreAPIGroup(t, host)
}
func testCoreAPIGroup(t *testing.T) {
serverURL := serverIP + "/api"
func testCoreAPIGroup(t *testing.T, host string) {
serverURL := host + "/api"
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -211,16 +188,16 @@ func findResource(resources []metav1.APIResource, resourceName string) *metav1.A
return nil
}
func testAPIResourceList(t *testing.T) {
testFederationResourceList(t)
testCoreResourceList(t)
testExtensionsResourceList(t)
// testBatchResourceList(t)
// testAutoscalingResourceList(t)
func testAPIResourceList(t *testing.T, host string) {
testFederationResourceList(t, host)
testCoreResourceList(t, host)
testExtensionsResourceList(t, host)
// testBatchResourceList(t, host)
// testAutoscalingResourceList(t, host)
}
func testFederationResourceList(t *testing.T) {
serverURL := serverIP + "/apis/" + fed_v1b1.SchemeGroupVersion.String()
func testFederationResourceList(t *testing.T, host string) {
serverURL := host + "/apis/" + fed_v1b1.SchemeGroupVersion.String()
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -243,8 +220,8 @@ func testFederationResourceList(t *testing.T) {
assert.False(t, found.Namespaced)
}
func testCoreResourceList(t *testing.T) {
serverURL := serverIP + "/api/" + v1.SchemeGroupVersion.String()
func testCoreResourceList(t *testing.T, host string) {
serverURL := host + "/api/" + v1.SchemeGroupVersion.String()
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -294,8 +271,8 @@ func testCoreResourceList(t *testing.T) {
assert.True(t, found.Namespaced)
}
func testExtensionsResourceList(t *testing.T) {
serverURL := serverIP + "/apis/" + ext_v1b1.SchemeGroupVersion.String()
func testExtensionsResourceList(t *testing.T, host string) {
serverURL := host + "/apis/" + ext_v1b1.SchemeGroupVersion.String()
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -351,8 +328,8 @@ func testExtensionsResourceList(t *testing.T) {
found = findResource(apiResourceList.APIResources, "deployments/rollback")
}
func testBatchResourceList(t *testing.T) {
serverURL := serverIP + "/apis/" + batch_v1.SchemeGroupVersion.String()
func testBatchResourceList(t *testing.T, host string) {
serverURL := host + "/apis/" + batch_v1.SchemeGroupVersion.String()
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)
@ -377,8 +354,8 @@ func testBatchResourceList(t *testing.T) {
assert.True(t, found.Namespaced)
}
func testAutoscalingResourceList(t *testing.T) {
serverURL := serverIP + "/apis/" + autoscaling_v1.SchemeGroupVersion.String()
func testAutoscalingResourceList(t *testing.T, host string) {
serverURL := host + "/apis/" + autoscaling_v1.SchemeGroupVersion.String()
contents, err := readResponse(serverURL)
if err != nil {
t.Fatalf("%v", err)

View File

@ -0,0 +1,37 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"api.go",
"util.go",
],
tags = ["automanaged"],
deps = [
"//federation/cmd/federation-apiserver/app:go_default_library",
"//federation/cmd/federation-apiserver/app/options:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor:github.com/pborman/uuid",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,120 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/pborman/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework"
)
const (
apiNoun = "federation apiserver"
waitInterval = 50 * time.Millisecond
)
func getRunOptions() *options.ServerRunOptions {
r := options.NewServerRunOptions()
r.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()}
// Use a unique prefix to ensure isolation from other tests using the same etcd instance
r.Etcd.StorageConfig.Prefix = uuid.New()
// Disable secure serving
r.SecureServing.ServingOptions.BindPort = 0
return r
}
// FederationAPIFixture manages a federation api server
type FederationAPIFixture struct {
Host string
stopChan chan struct{}
}
func (f *FederationAPIFixture) Setup(t *testing.T) {
if f.stopChan != nil {
t.Fatal("Setup() already called")
}
defer TeardownOnPanic(t, f)
f.stopChan = make(chan struct{})
runOptions := getRunOptions()
err := startServer(t, runOptions, f.stopChan)
if err != nil {
t.Fatal(err)
}
f.Host = fmt.Sprintf("http://%s:%d", runOptions.InsecureServing.BindAddress, runOptions.InsecureServing.BindPort)
err = waitForServer(t, f.Host)
if err != nil {
t.Fatal(err)
}
}
func (f *FederationAPIFixture) Teardown(t *testing.T) {
if f.stopChan != nil {
close(f.stopChan)
f.stopChan = nil
}
}
func startServer(t *testing.T, runOptions *options.ServerRunOptions, stopChan <-chan struct{}) error {
err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
port, err := framework.FindFreeLocalPort()
if err != nil {
t.Logf("Error allocating an ephemeral port: %v", err)
return false, nil
}
runOptions.InsecureServing.BindPort = port
err = app.NonBlockingRun(runOptions, stopChan)
if err != nil {
t.Logf("Error starting the %s: %v", apiNoun, err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("Timed out waiting for the %s: %v", apiNoun, err)
}
return nil
}
func waitForServer(t *testing.T, host string) error {
err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
_, err := http.Get(host)
if err != nil {
t.Logf("Error when trying to contact the API: %v", err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("Timed out waiting for the %s: %v", apiNoun, err)
}
return nil
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"testing"
)
// Setup is likely to be fixture-specific, but Teardown needs to be
// consistent to enable TeardownOnPanic.
type TestFixture interface {
Teardown(t *testing.T)
}
// TeardownOnPanic can be used to ensure cleanup on setup failure.
func TeardownOnPanic(t *testing.T, f TestFixture) {
if r := recover(); r != nil {
f.Teardown(t)
panic(r)
}
}

View File

@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
goruntime "runtime"
"strconv"
"sync"
"testing"
"time"
@ -509,3 +510,25 @@ func RunParallel(task Task, numTasks, numWorkers int) {
wg.Wait()
close(semCh)
}
// FindFreeLocalPort returns the number of an available port number on
// the loopback interface. Useful for determining the port to launch
// a server on. Error handling required - there is a non-zero chance
// that the returned port number will be bound by another process
// after this function returns.
func FindFreeLocalPort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer l.Close()
_, portStr, err := net.SplitHostPort(l.Addr().String())
if err != nil {
return 0, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return 0, err
}
return port, nil
}