diff --git a/hack/benchmark-integration.sh b/hack/benchmark-integration.sh new file mode 100755 index 00000000000..38422c28ee5 --- /dev/null +++ b/hack/benchmark-integration.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +# Copyright 2014 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. +BENCHMARK_REGEX=${BENCHMARK_REGEX:-"."} + +source "${KUBE_ROOT}/hack/lib/init.sh" + +cleanup() { + kube::etcd::cleanup + kube::log::status "Benchmark cleanup complete" +} + +runTests() { + kube::etcd::start + kube::log::status "Running benchmarks" + KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchtime 1s -cpu 4" \ + KUBE_RACE="-race" \ + KUBE_TEST_API_VERSIONS="v1beta3" \ + KUBE_TIMEOUT="-timeout 10m" \ + KUBE_TEST_ETCD_PREFIXES="registry"\ + ETCD_CUSTOM_PREFIX="None" \ + KUBE_TEST_ARGS="-bench-quiet 0 -bench-pods 30 -bench-tasks 1"\ + "${KUBE_ROOT}/hack/test-go.sh" test/integration + cleanup +} + +# Run cleanup to stop etcd on interrupt or other kill signal. +trap cleanup EXIT + +runTests diff --git a/hack/test-go.sh b/hack/test-go.sh index 872a60b8710..b10043d9d8c 100755 --- a/hack/test-go.sh +++ b/hack/test-go.sh @@ -100,6 +100,7 @@ shift $((OPTIND - 1)) # Use eval to preserve embedded quoted strings. eval "goflags=(${KUBE_GOFLAGS:-})" +eval "testargs=(${KUBE_TEST_ARGS:-})" # Filter out arguments that start with "-" and move them to goflags. testcases=() @@ -133,7 +134,8 @@ runTests() { count=0 for i in $(seq 1 ${iterations}); do if go test "${goflags[@]:+${goflags[@]}}" \ - ${KUBE_RACE} ${KUBE_TIMEOUT} "${pkg}"; then + ${KUBE_RACE} ${KUBE_TIMEOUT} "${pkg}" \ + "${testargs[@]:+${testargs[@]}}"; then pass=$((pass + 1)) else fails=$((fails + 1)) @@ -154,7 +156,8 @@ runTests() { if [[ ! ${KUBE_COVER} =~ ^[yY]$ ]]; then kube::log::status "Running unit tests without code coverage" go test "${goflags[@]:+${goflags[@]}}" \ - ${KUBE_RACE} ${KUBE_TIMEOUT} "${@+${@/#/${KUBE_GO_PACKAGE}/}}" + ${KUBE_RACE} ${KUBE_TIMEOUT} "${@+${@/#/${KUBE_GO_PACKAGE}/}}" \ + "${testargs[@]:+${testargs[@]}}" return 0 fi @@ -176,7 +179,8 @@ runTests() { -cover -covermode="${KUBE_COVERMODE}" \ -coverprofile="${cover_report_dir}/{}/${cover_profile}" \ "${cover_params[@]+${cover_params[@]}}" \ - "${KUBE_GO_PACKAGE}/{}" + "${KUBE_GO_PACKAGE}/{}" \ + "${testargs[@]:+${testargs[@]}}" COMBINED_COVER_PROFILE="${cover_report_dir}/combined-coverage.out" { diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index b23cd4882a8..840ea1b4955 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -169,6 +169,14 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl return rm } +// SetEventRecorder replaces the event recorder used by the replication manager +// with the given recorder. Only used for testing. +func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { + // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks + // need to pass in a fake. + rm.podControl = RealPodControl{rm.kubeClient, recorder} +} + // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer util.HandleCrash() @@ -178,6 +186,7 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { go util.Until(rm.worker, time.Second, stopCh) } <-stopCh + glog.Infof("Shutting down RC Manager") rm.queue.ShutDown() } diff --git a/pkg/kubectl/resize.go b/pkg/kubectl/resize.go index 41ebbba896d..efcd8767f92 100644 --- a/pkg/kubectl/resize.go +++ b/pkg/kubectl/resize.go @@ -101,6 +101,7 @@ type ReplicationControllerResizer struct { c ResizerClient } +// RetryParams encapsulates the retry parameters used by kubectl's resizer. type RetryParams struct { Interval, Timeout time.Duration } diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 56304d90d6c..ada2c441df7 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -46,15 +46,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/user" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/auth/authenticator/token/tokentest" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" ) var nodeResourceName string func init() { - requireEtcd() if api.PreV1Beta3(testapi.Version()) { nodeResourceName = "minions" } else { @@ -375,15 +374,13 @@ func getTestRequests() []struct { // // TODO(etune): write a fuzz test of the REST API. func TestAuthModeAlwaysAllow(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // Set up a master - - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } - var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { m.Handler.ServeHTTP(w, req) @@ -516,10 +513,10 @@ func getPreviousResourceVersionKey(url, id string) string { } func TestAuthModeAlwaysDeny(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // Set up a master - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -581,12 +578,12 @@ func (allowAliceAuthorizer) Authorize(a authorizer.Attributes) error { // the authentication system and authorized to do any actions. func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // This file has alice and bob in it. // Set up a master - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -670,12 +667,10 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { // the authentication system but not authorized to do any actions // should receive "Forbidden". func TestBobIsForbidden(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // This file has alice and bob in it. - - // Set up a master - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -731,12 +726,12 @@ func TestBobIsForbidden(t *testing.T) { // An authorization module is installed in this scenario for integration // test purposes, but requests aren't expected to reach it. func TestUnknownUserIsUnauthorized(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // This file has alice and bob in it. // Set up a master - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -810,10 +805,10 @@ func newAuthorizerWithContents(t *testing.T, contents string) authorizer.Authori // TestNamespaceAuthorization tests that authorization can be controlled // by namespace. func TestNamespaceAuthorization(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // This file has alice and bob in it. - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -923,12 +918,12 @@ func TestNamespaceAuthorization(t *testing.T) { // TestKindAuthorization tests that authorization can be controlled // by namespace. func TestKindAuthorization(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // This file has alice and bob in it. // Set up a master - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -1026,12 +1021,12 @@ func TestKindAuthorization(t *testing.T) { // TestReadOnlyAuthorization tests that authorization can be controlled // by namespace. func TestReadOnlyAuthorization(t *testing.T) { - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() // This file has alice and bob in it. // Set up a master - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/test/integration/benchmark-controller.json b/test/integration/benchmark-controller.json new file mode 100644 index 00000000000..c4f8ac929ee --- /dev/null +++ b/test/integration/benchmark-controller.json @@ -0,0 +1,25 @@ +{ + "kind": "ReplicationController", + "apiVersion": "v1beta3", + "metadata": { + "name": "test-controller", + "namespace": "test", + "labels": {"name": "test-controller"} + }, + "spec": { + "replicas": 0, + "selector": {"name": "test-pod"}, + "template": { + "metadata": { + "namespace": "test", + "labels": {"name": "test-pod"} + }, + "spec": { + "containers": [{ + "name": "test-container", + "image": "kubernetes/pause" + }] + } + } + } +} diff --git a/test/integration/client_test.go b/test/integration/client_test.go index 1afc4ec77db..9f193c211b6 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -34,18 +34,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" ) -func init() { - requireEtcd() -} - func TestClient(t *testing.T) { - _, s := runAMaster(t) + _, s := framework.RunAMaster(t) defer s.Close() ns := api.NamespaceDefault - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) info, err := client.ServerVersion() @@ -194,9 +191,9 @@ func TestMultiWatch(t *testing.T) { const watcherCount = 50 runtime.GOMAXPROCS(watcherCount) - deleteAllEtcdKeys() - defer deleteAllEtcdKeys() - _, s := runAMaster(t) + framework.DeleteAllEtcdKeys() + defer framework.DeleteAllEtcdKeys() + _, s := framework.RunAMaster(t) defer s.Close() ns := api.NamespaceDefault diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 0b402cf248d..0197968eb56 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -28,12 +28,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" ) -func init() { - requireEtcd() -} - type stringCodec struct{} type fakeAPIObject string @@ -56,9 +53,9 @@ func (c stringCodec) DecodeInto(data []byte, obj runtime.Object) error { } func TestSetObj(t *testing.T) { - client := newEtcdClient() + client := framework.NewEtcdClient() helper := tools.EtcdHelper{Client: client, Codec: stringCodec{}} - withEtcdKey(func(key string) { + framework.WithEtcdKey(func(key string) { fakeObject := fakeAPIObject("object") if err := helper.SetObj(key, &fakeObject, nil, 0); err != nil { t.Fatalf("unexpected error: %v", err) @@ -74,9 +71,9 @@ func TestSetObj(t *testing.T) { } func TestExtractObj(t *testing.T) { - client := newEtcdClient() + client := framework.NewEtcdClient() helper := tools.EtcdHelper{Client: client, Codec: stringCodec{}} - withEtcdKey(func(key string) { + framework.WithEtcdKey(func(key string) { _, err := client.Set(key, "object", 0) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -92,9 +89,9 @@ func TestExtractObj(t *testing.T) { } func TestWatch(t *testing.T) { - client := newEtcdClient() + client := framework.NewEtcdClient() helper := tools.NewEtcdHelper(client, testapi.Codec(), etcdtest.PathPrefix()) - withEtcdKey(func(key string) { + framework.WithEtcdKey(func(key string) { key = etcdtest.AddPrefix(key) resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) if err != nil { diff --git a/test/integration/framework/etcd_utils.go b/test/integration/framework/etcd_utils.go new file mode 100644 index 00000000000..d56166463c7 --- /dev/null +++ b/test/integration/framework/etcd_utils.go @@ -0,0 +1,77 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +build integration + +package framework + +import ( + "fmt" + "math/rand" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/master" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" +) + +// If you need to start an etcd instance by hand, you also need to insert a key +// for this check to pass (*any* key will do, eg: +//curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world"). +func init() { + RequireEtcd() +} + +func NewEtcdClient() *etcd.Client { + return etcd.NewClient([]string{}) +} + +func NewHelper() (tools.EtcdHelper, error) { + return master.NewEtcdHelper(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) +} + +func RequireEtcd() { + if _, err := NewEtcdClient().Get("/", false, false); err != nil { + glog.Fatalf("unable to connect to etcd for testing: %v", err) + } +} + +func WithEtcdKey(f func(string)) { + prefix := fmt.Sprintf("/test-%d", rand.Int63()) + defer NewEtcdClient().Delete(prefix, true) + f(prefix) +} + +// DeleteAllEtcdKeys deletes all keys from etcd. +// TODO: Instead of sprinkling calls to this throughout the code, adjust the +// prefix in etcdtest package; then just delete everything once at the end +// of the test run. +func DeleteAllEtcdKeys() { + glog.Infof("Deleting all etcd keys") + client := NewEtcdClient() + keys, err := client.Get("/", false, false) + if err != nil { + glog.Fatalf("Unable to list root etcd keys: %v", err) + } + for _, node := range keys.Node.Nodes { + if _, err := client.Delete(node.Key, true); err != nil { + glog.Fatalf("Unable delete key: %v", err) + } + } + +} diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go new file mode 100644 index 00000000000..b1ccadb31f8 --- /dev/null +++ b/test/integration/framework/master_utils.go @@ -0,0 +1,313 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "runtime" + "sync" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/master" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" + "github.com/golang/glog" +) + +const ( + // Timeout used in benchmarks, to eg: resize an rc + DefaultTimeout = 30 * time.Minute + + // Rc manifest used to create pods for benchmarks. + // TODO: Convert this to a full path? + TestRCManifest = "benchmark-controller.json" + + // Test Namspace, for pods and rcs. + TestNS = "test" +) + +// MasterComponents is a control struct for all master components started via NewMasterComponents. +// TODO: Include all master components (scheduler, nodecontroller). +// TODO: Reconcile with integration.go, currently the master used there doesn't understand +// how to restart cleanly, which is required for each iteration of a benchmark. The integration +// tests also don't make it easy to isolate and turn off components at will. +type MasterComponents struct { + // Raw http server in front of the master + ApiServer *httptest.Server + // Kubernetes master, contains an embedded etcd helper + KubeMaster *master.Master + // Restclient used to talk to the kubernetes master + RestClient *client.Client + // Replication controller manager + ControllerManager *controller.ReplicationManager + // Channel for stop signals to rc manager + rcStopCh chan struct{} + // Used to stop master components individually, and via MasterComponents.Stop + once sync.Once + // Kubernetes etcd helper, has embedded etcd client + EtcdHelper *tools.EtcdHelper +} + +// Config is a struct of configuration directives for NewMasterComponents. +type Config struct { + // If nil, a default is used, partially filled configs will not get populated. + MasterConfig *master.Config + StartReplicationManager bool + // If true, all existing etcd keys are purged before starting master components + DeleteEtcdKeys bool + // Client throttling qps + QPS float32 + // Client burst qps, also burst replicas allowed in rc manager + Burst int + // TODO: Add configs for endpoints controller, scheduler etc +} + +// NewMasterComponents creates, initializes and starts master components based on the given config. +func NewMasterComponents(c *Config) *MasterComponents { + m, s, h := startMasterOrDie(c.MasterConfig) + // TODO: Allow callers to pipe through a different master url and create a client/start components using it. + glog.Infof("Master %+v", s.URL) + if c.DeleteEtcdKeys { + DeleteAllEtcdKeys() + } + restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: "v1beta3", QPS: c.QPS, Burst: c.Burst}) + rcStopCh := make(chan struct{}) + controllerManager := controller.NewReplicationManager(restClient, c.Burst) + + // TODO: Support events once we can cleanly shutdown an event recorder. + controllerManager.SetEventRecorder(&record.FakeRecorder{}) + if c.StartReplicationManager { + go controllerManager.Run(runtime.NumCPU(), rcStopCh) + } + var once sync.Once + return &MasterComponents{ + ApiServer: s, + KubeMaster: m, + RestClient: restClient, + ControllerManager: controllerManager, + rcStopCh: rcStopCh, + EtcdHelper: h, + once: once, + } +} + +// startMasterOrDie starts a kubernetes master and an httpserver to handle api requests +func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, *tools.EtcdHelper) { + var m *master.Master + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + m.Handler.ServeHTTP(w, req) + })) + + var helper tools.EtcdHelper + var err error + if masterConfig == nil { + helper, err = master.NewEtcdHelper(NewEtcdClient(), "", etcdtest.PathPrefix()) + if err != nil { + glog.Fatalf("Failed to create etcd helper for master %v", err) + } + masterConfig = &master.Config{ + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableLogsSupport: false, + EnableProfiling: true, + EnableUISupport: false, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), + } + } else { + helper = masterConfig.EtcdHelper + } + m = master.New(masterConfig) + return m, s, &helper +} + +func (m *MasterComponents) stopRCManager() { + close(m.rcStopCh) +} + +func (m *MasterComponents) Stop(apiServer, rcManager bool) { + glog.Infof("Stopping master components") + if rcManager { + // Ordering matters because the apiServer will only shutdown when pending + // requests are done + m.once.Do(m.stopRCManager) + } + if apiServer { + m.ApiServer.Close() + } +} + +// RCFromManifest reads a .json file and returns the rc in it. +func RCFromManifest(fileName string) *api.ReplicationController { + data, err := ioutil.ReadFile(fileName) + if err != nil { + glog.Fatalf("Unexpected error reading rc manifest %v", err) + } + var controller api.ReplicationController + if err := api.Scheme.DecodeInto(data, &controller); err != nil { + glog.Fatalf("Unexpected error reading rc manifest %v", err) + } + return &controller +} + +// StopRC stops the rc via kubectl's stop library +func StopRC(rc *api.ReplicationController, restClient *client.Client) error { + reaper, err := kubectl.ReaperFor("ReplicationController", restClient) + if err != nil || reaper == nil { + return err + } + _, err = reaper.Stop(rc.Namespace, rc.Name, nil) + if err != nil { + return err + } + return nil +} + +// ResizeRC resizes the given rc to the given replicas. +func ResizeRC(name, ns string, replicas int, restClient *client.Client) (*api.ReplicationController, error) { + resizer, err := kubectl.ResizerFor("ReplicationController", kubectl.NewResizerClient(restClient)) + if err != nil { + return nil, err + } + retry := &kubectl.RetryParams{50 * time.Millisecond, DefaultTimeout} + waitForReplicas := &kubectl.RetryParams{50 * time.Millisecond, DefaultTimeout} + err = resizer.Resize(ns, name, uint(replicas), nil, retry, waitForReplicas) + if err != nil { + return nil, err + } + resized, err := restClient.ReplicationControllers(ns).Get(name) + if err != nil { + return nil, err + } + return resized, nil +} + +// StartRC creates given rc if it doesn't already exist, then updates it via kubectl's resizer. +func StartRC(controller *api.ReplicationController, restClient *client.Client) (*api.ReplicationController, error) { + created, err := restClient.ReplicationControllers(controller.Namespace).Get(controller.Name) + if err != nil { + glog.Infof("Rc %v doesn't exist, creating", controller.Name) + created, err = restClient.ReplicationControllers(controller.Namespace).Create(controller) + if err != nil { + return nil, err + } + } + // If we just created an rc, wait till it creates its replicas. + return ResizeRC(created.Name, created.Namespace, controller.Spec.Replicas, restClient) +} + +// StartPods check for numPods in TestNS. If they exist, it no-ops, otherwise it starts up +// a temp rc, resizes it to match numPods, then deletes the rc leaving behind the pods. +func StartPods(numPods int, host string, restClient *client.Client) error { + start := time.Now() + defer func() { + glog.Infof("StartPods took %v with numPods %d", time.Since(start), numPods) + }() + hostField := fields.OneTermEqualSelector(client.PodHost, host) + pods, err := restClient.Pods(TestNS).List(labels.Everything(), hostField) + if err != nil || len(pods.Items) == numPods { + return err + } + glog.Infof("Found %d pods that match host %v, require %d", len(pods.Items), hostField, numPods) + // For the sake of simplicity, assume all pods in TestNS have selectors matching TestRCManifest. + controller := RCFromManifest(TestRCManifest) + + // Make the rc unique to the given host. + controller.Spec.Replicas = numPods + controller.Spec.Template.Spec.Host = host + controller.Name = controller.Name + host + controller.Spec.Selector["host"] = host + controller.Spec.Template.Labels["host"] = host + + if rc, err := StartRC(controller, restClient); err != nil { + return err + } else { + // Delete the rc, otherwise when we restart master components for the next benchmark + // the rc controller will race with the pods controller in the rc manager. + return restClient.ReplicationControllers(TestNS).Delete(rc.Name) + } +} + +// TODO: Merge this into startMasterOrDie. +func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) { + helper, err := master.NewEtcdHelper(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + m := master.New(&master.Config{ + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableLogsSupport: false, + EnableProfiling: true, + EnableUISupport: false, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), + }) + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + m.Handler.ServeHTTP(w, req) + })) + + return m, s +} + +// Task is a function passed to worker goroutines by RunParallel. +// The function needs to implement its own thread safety. +type Task func(id int) error + +// RunParallel spawns a goroutine per task in the given queue +func RunParallel(task Task, numTasks, numWorkers int) { + start := time.Now() + if numWorkers <= 0 { + numWorkers = numTasks + } + defer func() { + glog.Infof("RunParallel took %v for %d tasks and %d workers", time.Since(start), numTasks, numWorkers) + }() + var wg sync.WaitGroup + semCh := make(chan struct{}, numWorkers) + wg.Add(numTasks) + for id := 0; id < numTasks; id++ { + go func(id int) { + semCh <- struct{}{} + err := task(id) + if err != nil { + glog.Fatalf("Worker failed with %v", err) + } + <-semCh + wg.Done() + }(id) + } + wg.Wait() + close(semCh) +} diff --git a/test/integration/master_benchmark_test.go b/test/integration/master_benchmark_test.go new file mode 100644 index 00000000000..fb2843d55c8 --- /dev/null +++ b/test/integration/master_benchmark_test.go @@ -0,0 +1,195 @@ +// +build benchmark,!no-etcd,!integration + +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "flag" + "fmt" + "os" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" + "github.com/golang/glog" +) + +// Command line flag globals, parsed in init and used by the benchmarks: +// * pods && !tasks: Start -pods, scale number of parallel tasks with b.N +// * !pods && tasks: Start -tasks, scale pods with b.N +// * pods && tasks: Ignore b.N, benchmark behaves like a test constrained by -benchtime. +// * !pods && !tasks: scale pods and workers with b.N. +// -workers specifies the number of workers to shard tasks across. +// Eg: go test bench . -bench-pods 3000 -bench-tasks 100 -bench-tasks 10: +// Create 100 tasks each listing 3000 pods, and run them 10 at a time. +var ( + Workers int + Pods int + Tasks int +) + +const Glog_fatalf = 3 + +func init() { + q := flag.Int("bench-quiet", 3, "Quietness, don't glog severities <= given level during the benchmark.") + pods := flag.Int("bench-pods", -1, "Number of pods for the benchmark. If unspecified, uses b.N.") + workers := flag.Int("bench-workers", -1, "Number workers for the benchmark. If unspecified, uses tasks.") + tasks := flag.Int("bench-tasks", -1, "Number of tasks for the benchmark. These tasks are sharded across the workers. If unspecified, uses b.N.") + flag.Parse() + + // Unfortunately this v level goes in the opposite direction as stderrthreshold. + flag.Set("v", fmt.Sprintf("%d", *q)) + + // We need quiet logs to parse benchmark results, which includes Errorf. + flag.Set("logtostderr", "false") + flag.Set("stderrthreshold", fmt.Sprintf("%d", Glog_fatalf-*q)) + Pods = *pods + Workers = *workers + Tasks = *tasks + framework.DeleteAllEtcdKeys() +} + +// getPods returns the cmd line -pods or b.N if -pods wasn't specified. +// Benchmarks can call getPods to get the number of pods they need to +// create for a given benchmark. +func getPods(bN int) int { + if Pods < 0 { + return bN + } + return Pods +} + +// getTasks returns the cmd line -workers or b.N if -workers wasn't specified. +// Benchmarks would call getTasks to get the number of workers required to +// perform the benchmark in parallel. +func getTasks(bN int) int { + if Tasks < 0 { + return bN + } + return Tasks +} + +// getIterations returns the number of iterations required by each benchmark for +// go to produce reliable timing results. +func getIterations(bN int) int { + // Anything with constant pods is only linear if we iterate b.N times. + if Pods > 0 { + return bN + } + return 1 +} + +// startPodsOnNodes creates numPods sharded across numNodes +func startPodsOnNodes(numPods, numNodes int, restClient *client.Client) { + podsPerNode := numPods / numNodes + if podsPerNode < 1 { + podsPerNode = 1 + } + framework.RunParallel(func(id int) error { + return framework.StartPods(podsPerNode, fmt.Sprintf("host.%d", id), restClient) + }, numNodes, -1) +} + +// Benchmark pod listing by waiting on `Tasks` listers to list `Pods` pods via `Workers`. +func BenchmarkPodList(b *testing.B) { + b.StopTimer() + m := framework.NewMasterComponents(&framework.Config{nil, true, false, 250.0, 500}) + defer m.Stop(true, true) + + numPods, numTasks, iter := getPods(b.N), getTasks(b.N), getIterations(b.N) + podsPerNode := numPods / numTasks + if podsPerNode < 1 { + podsPerNode = 1 + } + glog.Infof("Starting benchmark: b.N %d, pods %d, workers %d, podsPerNode %d", + b.N, numPods, numTasks, podsPerNode) + + startPodsOnNodes(numPods, numTasks, m.RestClient) + // Stop the rc manager so it doesn't steal resources + m.Stop(false, true) + + b.StartTimer() + for i := 0; i < iter; i++ { + framework.RunParallel(func(id int) error { + host := fmt.Sprintf("host.%d", id) + now := time.Now() + defer func() { + glog.V(3).Infof("Worker %d: Node %v listing pods took %v", id, host, time.Since(now)) + }() + if pods, err := m.RestClient.Pods(framework.TestNS).List( + labels.Everything(), + fields.OneTermEqualSelector(client.PodHost, host)); err != nil { + return err + } else if len(pods.Items) < podsPerNode { + glog.Fatalf("List retrieved %d pods, which is less than %d", len(pods.Items), podsPerNode) + } + return nil + }, numTasks, Workers) + } + b.StopTimer() +} + +// Benchmark pod listing by waiting on `Tasks` listers to list `Pods` pods via `Workers`. +func BenchmarkPodListEtcd(b *testing.B) { + b.StopTimer() + m := framework.NewMasterComponents(&framework.Config{nil, true, false, 250.0, 500}) + defer m.Stop(true, true) + + numPods, numTasks, iter := getPods(b.N), getTasks(b.N), getIterations(b.N) + podsPerNode := numPods / numTasks + if podsPerNode < 1 { + podsPerNode = 1 + } + + startPodsOnNodes(numPods, numTasks, m.RestClient) + // Stop the rc manager so it doesn't steal resources + m.Stop(false, true) + + glog.Infof("Starting benchmark: b.N %d, pods %d, workers %d, podsPerNode %d", + b.N, numPods, numTasks, podsPerNode) + + ctx := api.WithNamespace(api.NewContext(), framework.TestNS) + key := etcdgeneric.NamespaceKeyRootFunc(ctx, fmt.Sprintf("%s/pods", etcdtest.PathPrefix())) + + b.StartTimer() + for i := 0; i < iter; i++ { + framework.RunParallel(func(id int) error { + now := time.Now() + defer func() { + glog.V(3).Infof("Worker %d: listing pods took %v", id, time.Since(now)) + }() + if response, err := m.EtcdHelper.Client.Get(key, true, true); err != nil { + return err + } else if len(response.Node.Nodes) < podsPerNode { + glog.Fatalf("List retrieved %d pods, which is less than %d", len(response.Node.Nodes), podsPerNode) + } + return nil + }, numTasks, Workers) + } + b.StopTimer() +} + +func TestMain(m *testing.M) { + os.Exit(m.Run()) +} diff --git a/test/integration/metrics_test.go b/test/integration/metrics_test.go index 6f04882a30d..d9567e09c24 100644 --- a/test/integration/metrics_test.go +++ b/test/integration/metrics_test.go @@ -30,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" "github.com/golang/glog" "github.com/golang/protobuf/proto" @@ -38,10 +39,6 @@ import ( const scrapeRequestHeader = "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=compact-text" -func init() { - requireEtcd() -} - func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) { req, err := http.NewRequest("GET", s.URL+"/metrics", nil) if err != nil { @@ -90,7 +87,7 @@ func checkForExpectedMetrics(t *testing.T, metrics []*prometheuspb.MetricFamily, } func TestMasterProcessMetrics(t *testing.T) { - _, s := runAMaster(t) + _, s := framework.RunAMaster(t) defer s.Close() metrics, err := scrapeMetrics(s) @@ -107,7 +104,7 @@ func TestMasterProcessMetrics(t *testing.T) { } func TestApiserverMetrics(t *testing.T) { - _, s := runAMaster(t) + _, s := framework.RunAMaster(t) defer s.Close() // Make a request to the apiserver to ensure there's at least one data point diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index be8681ce4d1..e85479500c5 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -35,19 +35,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" ) -func init() { - requireEtcd() -} - type nodeMutationFunc func(t *testing.T, n *api.Node, nodeStore cache.Store, c *client.Client) type nodeStateManager struct { @@ -56,11 +52,11 @@ type nodeStateManager struct { } func TestUnschedulableNodes(t *testing.T) { - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("Couldn't create etcd helper: %v", err) } - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/test/integration/secret_test.go b/test/integration/secret_test.go index 02983a3450a..b6c0ed7d4df 100644 --- a/test/integration/secret_test.go +++ b/test/integration/secret_test.go @@ -30,14 +30,10 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" + "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" ) -func init() { - requireEtcd() -} - func deletePodOrErrorf(t *testing.T, c *client.Client, ns, name string) { if err := c.Pods(ns).Delete(name, nil); err != nil { t.Errorf("unable to delete pod %v: %v", name, err) @@ -51,7 +47,7 @@ func deleteSecretOrErrorf(t *testing.T, c *client.Client, ns, name string) { // TestSecrets tests apiserver-side behavior of creation of secret objects and their use by pods. func TestSecrets(t *testing.T) { - helper, err := master.NewEtcdHelper(newEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) + helper, err := framework.NewHelper() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -74,7 +70,7 @@ func TestSecrets(t *testing.T) { AdmissionControl: admit.NewAlwaysAdmit(), }) - deleteAllEtcdKeys() + framework.DeleteAllEtcdKeys() client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) DoTestSecrets(t, client, testapi.Version()) }