Merge pull request #6694 from bprashanth/bench

Add a simple master benchmark and a wrapper to run it.
This commit is contained in:
Daniel Smith 2015-05-18 14:42:24 -07:00
commit 5e056f119d
14 changed files with 714 additions and 64 deletions

48
hack/benchmark-integration.sh Executable file
View File

@ -0,0 +1,48 @@
#!/bin/bash
# Copyright 2014 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
BENCHMARK_REGEX=${BENCHMARK_REGEX:-"."}
source "${KUBE_ROOT}/hack/lib/init.sh"
cleanup() {
kube::etcd::cleanup
kube::log::status "Benchmark cleanup complete"
}
runTests() {
kube::etcd::start
kube::log::status "Running benchmarks"
KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchtime 1s -cpu 4" \
KUBE_RACE="-race" \
KUBE_TEST_API_VERSIONS="v1beta3" \
KUBE_TIMEOUT="-timeout 10m" \
KUBE_TEST_ETCD_PREFIXES="registry"\
ETCD_CUSTOM_PREFIX="None" \
KUBE_TEST_ARGS="-bench-quiet 0 -bench-pods 30 -bench-tasks 1"\
"${KUBE_ROOT}/hack/test-go.sh" test/integration
cleanup
}
# Run cleanup to stop etcd on interrupt or other kill signal.
trap cleanup EXIT
runTests

View File

@ -101,6 +101,7 @@ shift $((OPTIND - 1))
# Use eval to preserve embedded quoted strings.
eval "goflags=(${KUBE_GOFLAGS:-})"
eval "testargs=(${KUBE_TEST_ARGS:-})"
# Filter out arguments that start with "-" and move them to goflags.
testcases=()
@ -134,7 +135,8 @@ runTests() {
count=0
for i in $(seq 1 ${iterations}); do
if go test "${goflags[@]:+${goflags[@]}}" \
${KUBE_RACE} ${KUBE_TIMEOUT} "${pkg}"; then
${KUBE_RACE} ${KUBE_TIMEOUT} "${pkg}" \
"${testargs[@]:+${testargs[@]}}"; then
pass=$((pass + 1))
else
fails=$((fails + 1))
@ -155,7 +157,8 @@ runTests() {
if [[ ! ${KUBE_COVER} =~ ^[yY]$ ]]; then
kube::log::status "Running unit tests without code coverage"
go test "${goflags[@]:+${goflags[@]}}" \
${KUBE_RACE} ${KUBE_TIMEOUT} "${@+${@/#/${KUBE_GO_PACKAGE}/}}"
${KUBE_RACE} ${KUBE_TIMEOUT} "${@+${@/#/${KUBE_GO_PACKAGE}/}}" \
"${testargs[@]:+${testargs[@]}}"
return 0
fi
@ -177,7 +180,8 @@ runTests() {
-cover -covermode="${KUBE_COVERMODE}" \
-coverprofile="${cover_report_dir}/{}/${cover_profile}" \
"${cover_params[@]+${cover_params[@]}}" \
"${KUBE_GO_PACKAGE}/{}"
"${KUBE_GO_PACKAGE}/{}" \
"${testargs[@]:+${testargs[@]}}"
COMBINED_COVER_PROFILE="${cover_report_dir}/combined-coverage.out"
{

View File

@ -169,6 +169,14 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rm.podControl = RealPodControl{rm.kubeClient, recorder}
}
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
@ -178,6 +186,7 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
go util.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")
rm.queue.ShutDown()
}

View File

@ -101,6 +101,7 @@ type ReplicationControllerResizer struct {
c ResizerClient
}
// RetryParams encapsulates the retry parameters used by kubectl's resizer.
type RetryParams struct {
Interval, Timeout time.Duration
}

View File

@ -46,15 +46,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/user"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/auth/authenticator/token/tokentest"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
)
var nodeResourceName string
func init() {
requireEtcd()
if api.PreV1Beta3(testapi.Version()) {
nodeResourceName = "minions"
} else {
@ -375,15 +374,13 @@ func getTestRequests() []struct {
//
// TODO(etune): write a fuzz test of the REST API.
func TestAuthModeAlwaysAllow(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req)
@ -516,10 +513,10 @@ func getPreviousResourceVersionKey(url, id string) string {
}
func TestAuthModeAlwaysDeny(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -581,12 +578,12 @@ func (allowAliceAuthorizer) Authorize(a authorizer.Attributes) error {
// the authentication system and authorized to do any actions.
func TestAliceNotForbiddenOrUnauthorized(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// This file has alice and bob in it.
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -670,12 +667,10 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) {
// the authentication system but not authorized to do any actions
// should receive "Forbidden".
func TestBobIsForbidden(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// This file has alice and bob in it.
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -731,12 +726,12 @@ func TestBobIsForbidden(t *testing.T) {
// An authorization module is installed in this scenario for integration
// test purposes, but requests aren't expected to reach it.
func TestUnknownUserIsUnauthorized(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// This file has alice and bob in it.
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -810,10 +805,10 @@ func newAuthorizerWithContents(t *testing.T, contents string) authorizer.Authori
// TestNamespaceAuthorization tests that authorization can be controlled
// by namespace.
func TestNamespaceAuthorization(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// This file has alice and bob in it.
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -923,12 +918,12 @@ func TestNamespaceAuthorization(t *testing.T) {
// TestKindAuthorization tests that authorization can be controlled
// by namespace.
func TestKindAuthorization(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// This file has alice and bob in it.
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -1026,12 +1021,12 @@ func TestKindAuthorization(t *testing.T) {
// TestReadOnlyAuthorization tests that authorization can be controlled
// by namespace.
func TestReadOnlyAuthorization(t *testing.T) {
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
// This file has alice and bob in it.
// Set up a master
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -0,0 +1,25 @@
{
"kind": "ReplicationController",
"apiVersion": "v1beta3",
"metadata": {
"name": "test-controller",
"namespace": "test",
"labels": {"name": "test-controller"}
},
"spec": {
"replicas": 0,
"selector": {"name": "test-pod"},
"template": {
"metadata": {
"namespace": "test",
"labels": {"name": "test-pod"}
},
"spec": {
"containers": [{
"name": "test-container",
"image": "kubernetes/pause"
}]
}
}
}
}

View File

@ -34,18 +34,15 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
)
func init() {
requireEtcd()
}
func TestClient(t *testing.T) {
_, s := runAMaster(t)
_, s := framework.RunAMaster(t)
defer s.Close()
ns := api.NamespaceDefault
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
info, err := client.ServerVersion()
@ -194,9 +191,9 @@ func TestMultiWatch(t *testing.T) {
const watcherCount = 50
runtime.GOMAXPROCS(watcherCount)
deleteAllEtcdKeys()
defer deleteAllEtcdKeys()
_, s := runAMaster(t)
framework.DeleteAllEtcdKeys()
defer framework.DeleteAllEtcdKeys()
_, s := framework.RunAMaster(t)
defer s.Close()
ns := api.NamespaceDefault

View File

@ -28,12 +28,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
)
func init() {
requireEtcd()
}
type stringCodec struct{}
type fakeAPIObject string
@ -56,9 +53,9 @@ func (c stringCodec) DecodeInto(data []byte, obj runtime.Object) error {
}
func TestSetObj(t *testing.T) {
client := newEtcdClient()
client := framework.NewEtcdClient()
helper := tools.EtcdHelper{Client: client, Codec: stringCodec{}}
withEtcdKey(func(key string) {
framework.WithEtcdKey(func(key string) {
fakeObject := fakeAPIObject("object")
if err := helper.SetObj(key, &fakeObject, nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
@ -74,9 +71,9 @@ func TestSetObj(t *testing.T) {
}
func TestExtractObj(t *testing.T) {
client := newEtcdClient()
client := framework.NewEtcdClient()
helper := tools.EtcdHelper{Client: client, Codec: stringCodec{}}
withEtcdKey(func(key string) {
framework.WithEtcdKey(func(key string) {
_, err := client.Set(key, "object", 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -92,9 +89,9 @@ func TestExtractObj(t *testing.T) {
}
func TestWatch(t *testing.T) {
client := newEtcdClient()
client := framework.NewEtcdClient()
helper := tools.NewEtcdHelper(client, testapi.Codec(), etcdtest.PathPrefix())
withEtcdKey(func(key string) {
framework.WithEtcdKey(func(key string) {
key = etcdtest.AddPrefix(key)
resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
if err != nil {

View File

@ -0,0 +1,77 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +build integration
package framework
import (
"fmt"
"math/rand"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
// If you need to start an etcd instance by hand, you also need to insert a key
// for this check to pass (*any* key will do, eg:
//curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world").
func init() {
RequireEtcd()
}
func NewEtcdClient() *etcd.Client {
return etcd.NewClient([]string{})
}
func NewHelper() (tools.EtcdHelper, error) {
return master.NewEtcdHelper(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
}
func RequireEtcd() {
if _, err := NewEtcdClient().Get("/", false, false); err != nil {
glog.Fatalf("unable to connect to etcd for testing: %v", err)
}
}
func WithEtcdKey(f func(string)) {
prefix := fmt.Sprintf("/test-%d", rand.Int63())
defer NewEtcdClient().Delete(prefix, true)
f(prefix)
}
// DeleteAllEtcdKeys deletes all keys from etcd.
// TODO: Instead of sprinkling calls to this throughout the code, adjust the
// prefix in etcdtest package; then just delete everything once at the end
// of the test run.
func DeleteAllEtcdKeys() {
glog.Infof("Deleting all etcd keys")
client := NewEtcdClient()
keys, err := client.Get("/", false, false)
if err != nil {
glog.Fatalf("Unable to list root etcd keys: %v", err)
}
for _, node := range keys.Node.Nodes {
if _, err := client.Delete(node.Key, true); err != nil {
glog.Fatalf("Unable delete key: %v", err)
}
}
}

View File

@ -0,0 +1,313 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
"github.com/golang/glog"
)
const (
// Timeout used in benchmarks, to eg: resize an rc
DefaultTimeout = 30 * time.Minute
// Rc manifest used to create pods for benchmarks.
// TODO: Convert this to a full path?
TestRCManifest = "benchmark-controller.json"
// Test Namspace, for pods and rcs.
TestNS = "test"
)
// MasterComponents is a control struct for all master components started via NewMasterComponents.
// TODO: Include all master components (scheduler, nodecontroller).
// TODO: Reconcile with integration.go, currently the master used there doesn't understand
// how to restart cleanly, which is required for each iteration of a benchmark. The integration
// tests also don't make it easy to isolate and turn off components at will.
type MasterComponents struct {
// Raw http server in front of the master
ApiServer *httptest.Server
// Kubernetes master, contains an embedded etcd helper
KubeMaster *master.Master
// Restclient used to talk to the kubernetes master
RestClient *client.Client
// Replication controller manager
ControllerManager *controller.ReplicationManager
// Channel for stop signals to rc manager
rcStopCh chan struct{}
// Used to stop master components individually, and via MasterComponents.Stop
once sync.Once
// Kubernetes etcd helper, has embedded etcd client
EtcdHelper *tools.EtcdHelper
}
// Config is a struct of configuration directives for NewMasterComponents.
type Config struct {
// If nil, a default is used, partially filled configs will not get populated.
MasterConfig *master.Config
StartReplicationManager bool
// If true, all existing etcd keys are purged before starting master components
DeleteEtcdKeys bool
// Client throttling qps
QPS float32
// Client burst qps, also burst replicas allowed in rc manager
Burst int
// TODO: Add configs for endpoints controller, scheduler etc
}
// NewMasterComponents creates, initializes and starts master components based on the given config.
func NewMasterComponents(c *Config) *MasterComponents {
m, s, h := startMasterOrDie(c.MasterConfig)
// TODO: Allow callers to pipe through a different master url and create a client/start components using it.
glog.Infof("Master %+v", s.URL)
if c.DeleteEtcdKeys {
DeleteAllEtcdKeys()
}
restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: "v1beta3", QPS: c.QPS, Burst: c.Burst})
rcStopCh := make(chan struct{})
controllerManager := controller.NewReplicationManager(restClient, c.Burst)
// TODO: Support events once we can cleanly shutdown an event recorder.
controllerManager.SetEventRecorder(&record.FakeRecorder{})
if c.StartReplicationManager {
go controllerManager.Run(runtime.NumCPU(), rcStopCh)
}
var once sync.Once
return &MasterComponents{
ApiServer: s,
KubeMaster: m,
RestClient: restClient,
ControllerManager: controllerManager,
rcStopCh: rcStopCh,
EtcdHelper: h,
once: once,
}
}
// startMasterOrDie starts a kubernetes master and an httpserver to handle api requests
func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, *tools.EtcdHelper) {
var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req)
}))
var helper tools.EtcdHelper
var err error
if masterConfig == nil {
helper, err = master.NewEtcdHelper(NewEtcdClient(), "", etcdtest.PathPrefix())
if err != nil {
glog.Fatalf("Failed to create etcd helper for master %v", err)
}
masterConfig = &master.Config{
EtcdHelper: helper,
KubeletClient: client.FakeKubeletClient{},
EnableLogsSupport: false,
EnableProfiling: true,
EnableUISupport: false,
APIPrefix: "/api",
Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
AdmissionControl: admit.NewAlwaysAdmit(),
}
} else {
helper = masterConfig.EtcdHelper
}
m = master.New(masterConfig)
return m, s, &helper
}
func (m *MasterComponents) stopRCManager() {
close(m.rcStopCh)
}
func (m *MasterComponents) Stop(apiServer, rcManager bool) {
glog.Infof("Stopping master components")
if rcManager {
// Ordering matters because the apiServer will only shutdown when pending
// requests are done
m.once.Do(m.stopRCManager)
}
if apiServer {
m.ApiServer.Close()
}
}
// RCFromManifest reads a .json file and returns the rc in it.
func RCFromManifest(fileName string) *api.ReplicationController {
data, err := ioutil.ReadFile(fileName)
if err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
var controller api.ReplicationController
if err := api.Scheme.DecodeInto(data, &controller); err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
return &controller
}
// StopRC stops the rc via kubectl's stop library
func StopRC(rc *api.ReplicationController, restClient *client.Client) error {
reaper, err := kubectl.ReaperFor("ReplicationController", restClient)
if err != nil || reaper == nil {
return err
}
_, err = reaper.Stop(rc.Namespace, rc.Name, nil)
if err != nil {
return err
}
return nil
}
// ResizeRC resizes the given rc to the given replicas.
func ResizeRC(name, ns string, replicas int, restClient *client.Client) (*api.ReplicationController, error) {
resizer, err := kubectl.ResizerFor("ReplicationController", kubectl.NewResizerClient(restClient))
if err != nil {
return nil, err
}
retry := &kubectl.RetryParams{50 * time.Millisecond, DefaultTimeout}
waitForReplicas := &kubectl.RetryParams{50 * time.Millisecond, DefaultTimeout}
err = resizer.Resize(ns, name, uint(replicas), nil, retry, waitForReplicas)
if err != nil {
return nil, err
}
resized, err := restClient.ReplicationControllers(ns).Get(name)
if err != nil {
return nil, err
}
return resized, nil
}
// StartRC creates given rc if it doesn't already exist, then updates it via kubectl's resizer.
func StartRC(controller *api.ReplicationController, restClient *client.Client) (*api.ReplicationController, error) {
created, err := restClient.ReplicationControllers(controller.Namespace).Get(controller.Name)
if err != nil {
glog.Infof("Rc %v doesn't exist, creating", controller.Name)
created, err = restClient.ReplicationControllers(controller.Namespace).Create(controller)
if err != nil {
return nil, err
}
}
// If we just created an rc, wait till it creates its replicas.
return ResizeRC(created.Name, created.Namespace, controller.Spec.Replicas, restClient)
}
// StartPods check for numPods in TestNS. If they exist, it no-ops, otherwise it starts up
// a temp rc, resizes it to match numPods, then deletes the rc leaving behind the pods.
func StartPods(numPods int, host string, restClient *client.Client) error {
start := time.Now()
defer func() {
glog.Infof("StartPods took %v with numPods %d", time.Since(start), numPods)
}()
hostField := fields.OneTermEqualSelector(client.PodHost, host)
pods, err := restClient.Pods(TestNS).List(labels.Everything(), hostField)
if err != nil || len(pods.Items) == numPods {
return err
}
glog.Infof("Found %d pods that match host %v, require %d", len(pods.Items), hostField, numPods)
// For the sake of simplicity, assume all pods in TestNS have selectors matching TestRCManifest.
controller := RCFromManifest(TestRCManifest)
// Make the rc unique to the given host.
controller.Spec.Replicas = numPods
controller.Spec.Template.Spec.Host = host
controller.Name = controller.Name + host
controller.Spec.Selector["host"] = host
controller.Spec.Template.Labels["host"] = host
if rc, err := StartRC(controller, restClient); err != nil {
return err
} else {
// Delete the rc, otherwise when we restart master components for the next benchmark
// the rc controller will race with the pods controller in the rc manager.
return restClient.ReplicationControllers(TestNS).Delete(rc.Name)
}
}
// TODO: Merge this into startMasterOrDie.
func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) {
helper, err := master.NewEtcdHelper(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
m := master.New(&master.Config{
EtcdHelper: helper,
KubeletClient: client.FakeKubeletClient{},
EnableLogsSupport: false,
EnableProfiling: true,
EnableUISupport: false,
APIPrefix: "/api",
Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
AdmissionControl: admit.NewAlwaysAdmit(),
})
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req)
}))
return m, s
}
// Task is a function passed to worker goroutines by RunParallel.
// The function needs to implement its own thread safety.
type Task func(id int) error
// RunParallel spawns a goroutine per task in the given queue
func RunParallel(task Task, numTasks, numWorkers int) {
start := time.Now()
if numWorkers <= 0 {
numWorkers = numTasks
}
defer func() {
glog.Infof("RunParallel took %v for %d tasks and %d workers", time.Since(start), numTasks, numWorkers)
}()
var wg sync.WaitGroup
semCh := make(chan struct{}, numWorkers)
wg.Add(numTasks)
for id := 0; id < numTasks; id++ {
go func(id int) {
semCh <- struct{}{}
err := task(id)
if err != nil {
glog.Fatalf("Worker failed with %v", err)
}
<-semCh
wg.Done()
}(id)
}
wg.Wait()
close(semCh)
}

View File

@ -0,0 +1,195 @@
// +build benchmark,!no-etcd,!integration
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"flag"
"fmt"
"os"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
"github.com/golang/glog"
)
// Command line flag globals, parsed in init and used by the benchmarks:
// * pods && !tasks: Start -pods, scale number of parallel tasks with b.N
// * !pods && tasks: Start -tasks, scale pods with b.N
// * pods && tasks: Ignore b.N, benchmark behaves like a test constrained by -benchtime.
// * !pods && !tasks: scale pods and workers with b.N.
// -workers specifies the number of workers to shard tasks across.
// Eg: go test bench . -bench-pods 3000 -bench-tasks 100 -bench-tasks 10:
// Create 100 tasks each listing 3000 pods, and run them 10 at a time.
var (
Workers int
Pods int
Tasks int
)
const Glog_fatalf = 3
func init() {
q := flag.Int("bench-quiet", 3, "Quietness, don't glog severities <= given level during the benchmark.")
pods := flag.Int("bench-pods", -1, "Number of pods for the benchmark. If unspecified, uses b.N.")
workers := flag.Int("bench-workers", -1, "Number workers for the benchmark. If unspecified, uses tasks.")
tasks := flag.Int("bench-tasks", -1, "Number of tasks for the benchmark. These tasks are sharded across the workers. If unspecified, uses b.N.")
flag.Parse()
// Unfortunately this v level goes in the opposite direction as stderrthreshold.
flag.Set("v", fmt.Sprintf("%d", *q))
// We need quiet logs to parse benchmark results, which includes Errorf.
flag.Set("logtostderr", "false")
flag.Set("stderrthreshold", fmt.Sprintf("%d", Glog_fatalf-*q))
Pods = *pods
Workers = *workers
Tasks = *tasks
framework.DeleteAllEtcdKeys()
}
// getPods returns the cmd line -pods or b.N if -pods wasn't specified.
// Benchmarks can call getPods to get the number of pods they need to
// create for a given benchmark.
func getPods(bN int) int {
if Pods < 0 {
return bN
}
return Pods
}
// getTasks returns the cmd line -workers or b.N if -workers wasn't specified.
// Benchmarks would call getTasks to get the number of workers required to
// perform the benchmark in parallel.
func getTasks(bN int) int {
if Tasks < 0 {
return bN
}
return Tasks
}
// getIterations returns the number of iterations required by each benchmark for
// go to produce reliable timing results.
func getIterations(bN int) int {
// Anything with constant pods is only linear if we iterate b.N times.
if Pods > 0 {
return bN
}
return 1
}
// startPodsOnNodes creates numPods sharded across numNodes
func startPodsOnNodes(numPods, numNodes int, restClient *client.Client) {
podsPerNode := numPods / numNodes
if podsPerNode < 1 {
podsPerNode = 1
}
framework.RunParallel(func(id int) error {
return framework.StartPods(podsPerNode, fmt.Sprintf("host.%d", id), restClient)
}, numNodes, -1)
}
// Benchmark pod listing by waiting on `Tasks` listers to list `Pods` pods via `Workers`.
func BenchmarkPodList(b *testing.B) {
b.StopTimer()
m := framework.NewMasterComponents(&framework.Config{nil, true, false, 250.0, 500})
defer m.Stop(true, true)
numPods, numTasks, iter := getPods(b.N), getTasks(b.N), getIterations(b.N)
podsPerNode := numPods / numTasks
if podsPerNode < 1 {
podsPerNode = 1
}
glog.Infof("Starting benchmark: b.N %d, pods %d, workers %d, podsPerNode %d",
b.N, numPods, numTasks, podsPerNode)
startPodsOnNodes(numPods, numTasks, m.RestClient)
// Stop the rc manager so it doesn't steal resources
m.Stop(false, true)
b.StartTimer()
for i := 0; i < iter; i++ {
framework.RunParallel(func(id int) error {
host := fmt.Sprintf("host.%d", id)
now := time.Now()
defer func() {
glog.V(3).Infof("Worker %d: Node %v listing pods took %v", id, host, time.Since(now))
}()
if pods, err := m.RestClient.Pods(framework.TestNS).List(
labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, host)); err != nil {
return err
} else if len(pods.Items) < podsPerNode {
glog.Fatalf("List retrieved %d pods, which is less than %d", len(pods.Items), podsPerNode)
}
return nil
}, numTasks, Workers)
}
b.StopTimer()
}
// Benchmark pod listing by waiting on `Tasks` listers to list `Pods` pods via `Workers`.
func BenchmarkPodListEtcd(b *testing.B) {
b.StopTimer()
m := framework.NewMasterComponents(&framework.Config{nil, true, false, 250.0, 500})
defer m.Stop(true, true)
numPods, numTasks, iter := getPods(b.N), getTasks(b.N), getIterations(b.N)
podsPerNode := numPods / numTasks
if podsPerNode < 1 {
podsPerNode = 1
}
startPodsOnNodes(numPods, numTasks, m.RestClient)
// Stop the rc manager so it doesn't steal resources
m.Stop(false, true)
glog.Infof("Starting benchmark: b.N %d, pods %d, workers %d, podsPerNode %d",
b.N, numPods, numTasks, podsPerNode)
ctx := api.WithNamespace(api.NewContext(), framework.TestNS)
key := etcdgeneric.NamespaceKeyRootFunc(ctx, fmt.Sprintf("%s/pods", etcdtest.PathPrefix()))
b.StartTimer()
for i := 0; i < iter; i++ {
framework.RunParallel(func(id int) error {
now := time.Now()
defer func() {
glog.V(3).Infof("Worker %d: listing pods took %v", id, time.Since(now))
}()
if response, err := m.EtcdHelper.Client.Get(key, true, true); err != nil {
return err
} else if len(response.Node.Nodes) < podsPerNode {
glog.Fatalf("List retrieved %d pods, which is less than %d", len(response.Node.Nodes), podsPerNode)
}
return nil
}, numTasks, Workers)
}
b.StopTimer()
}
func TestMain(m *testing.M) {
os.Exit(m.Run())
}

View File

@ -30,6 +30,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
@ -38,10 +39,6 @@ import (
const scrapeRequestHeader = "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=compact-text"
func init() {
requireEtcd()
}
func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) {
req, err := http.NewRequest("GET", s.URL+"/metrics", nil)
if err != nil {
@ -90,7 +87,7 @@ func checkForExpectedMetrics(t *testing.T, metrics []*prometheuspb.MetricFamily,
}
func TestMasterProcessMetrics(t *testing.T) {
_, s := runAMaster(t)
_, s := framework.RunAMaster(t)
defer s.Close()
metrics, err := scrapeMetrics(s)
@ -107,7 +104,7 @@ func TestMasterProcessMetrics(t *testing.T) {
}
func TestApiserverMetrics(t *testing.T) {
_, s := runAMaster(t)
_, s := framework.RunAMaster(t)
defer s.Close()
// Make a request to the apiserver to ensure there's at least one data point

View File

@ -36,19 +36,15 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
)
func init() {
requireEtcd()
}
type nodeMutationFunc func(t *testing.T, n *api.Node, nodeStore cache.Store, c *client.Client)
type nodeStateManager struct {
@ -57,11 +53,11 @@ type nodeStateManager struct {
}
func TestUnschedulableNodes(t *testing.T) {
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("Couldn't create etcd helper: %v", err)
}
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

View File

@ -30,14 +30,10 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
)
func init() {
requireEtcd()
}
func deletePodOrErrorf(t *testing.T, c *client.Client, ns, name string) {
if err := c.Pods(ns).Delete(name, nil); err != nil {
t.Errorf("unable to delete pod %v: %v", name, err)
@ -51,7 +47,7 @@ func deleteSecretOrErrorf(t *testing.T, c *client.Client, ns, name string) {
// TestSecrets tests apiserver-side behavior of creation of secret objects and their use by pods.
func TestSecrets(t *testing.T) {
helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
helper, err := framework.NewHelper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -74,7 +70,7 @@ func TestSecrets(t *testing.T) {
AdmissionControl: admit.NewAlwaysAdmit(),
})
deleteAllEtcdKeys()
framework.DeleteAllEtcdKeys()
client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
DoTestSecrets(t, client, testapi.Version())
}