From 377e5c533b64b56892a8fdf555206f69824a5d18 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 6 Oct 2015 18:15:21 -0700 Subject: [PATCH] add scheduler integration benchmark --- test/integration/scheduler_test.go | 147 +++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index 04dbe2e93da..8a617e8e028 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -24,6 +24,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -291,3 +292,149 @@ func DoTestUnschedulableNodes(t *testing.T, restClient *client.Client, nodeStore } } } + +func BenchmarkScheduling(b *testing.B) { + etcdStorage, err := framework.NewEtcdStorage() + if err != nil { + b.Fatalf("Couldn't create etcd storage: %v", err) + } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) + framework.DeleteAllEtcdKeys() + + var m *master.Master + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + m.Handler.ServeHTTP(w, req) + })) + defer s.Close() + + m = master.New(&master.Config{ + StorageDestinations: storageDestinations, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), + StorageVersions: map[string]string{"": testapi.Default.Version()}, + }) + + c := client.NewOrDie(&client.Config{ + Host: s.URL, + Version: testapi.Default.Version(), + QPS: 5000.0, + Burst: 5000, + }) + + schedulerConfigFactory := factory.NewConfigFactory(c, nil) + schedulerConfig, err := schedulerConfigFactory.Create() + if err != nil { + b.Fatalf("Couldn't create scheduler config: %v", err) + } + eventBroadcaster := record.NewBroadcaster() + schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) + eventBroadcaster.StartRecordingToSink(c.Events("")) + scheduler.New(schedulerConfig).Run() + + defer close(schedulerConfig.StopEverything) + + makeNNodes(c, 1000) + N := b.N + b.ResetTimer() + makeNPods(c, N) + for { + objs := schedulerConfigFactory.ScheduledPodLister.Store.List() + if len(objs) >= N { + fmt.Printf("%v pods scheduled.\n", len(objs)) + /* // To prove that this actually works: + for _, o := range objs { + fmt.Printf("%s\n", o.(*api.Pod).Spec.NodeName) + } + */ + break + } + time.Sleep(time.Millisecond) + } + b.StopTimer() +} + +func makeNNodes(c client.Interface, N int) { + baseNode := &api.Node{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "scheduler-test-node-", + }, + Spec: api.NodeSpec{ + ExternalID: "foobar", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + api.ResourceCPU: resource.MustParse("4"), + api.ResourceMemory: resource.MustParse("32Gi"), + }, + Phase: api.NodeRunning, + Conditions: []api.NodeCondition{ + {Type: api.NodeReady, Status: api.ConditionTrue}, + }, + }, + } + for i := 0; i < N; i++ { + if _, err := c.Nodes().Create(baseNode); err != nil { + panic("error creating node: " + err.Error()) + } + } +} + +func makeNPods(c client.Interface, N int) { + basePod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "scheduler-test-pod-", + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "pause", + Image: "gcr.io/google_containers/pause:1.0", + Resources: api.ResourceRequirements{ + Limits: api.ResourceList{ + api.ResourceCPU: resource.MustParse("100m"), + api.ResourceMemory: resource.MustParse("500Mi"), + }, + Requests: api.ResourceList{ + api.ResourceCPU: resource.MustParse("100m"), + api.ResourceMemory: resource.MustParse("500Mi"), + }, + }, + }}, + }, + } + wg := sync.WaitGroup{} + threads := 30 + wg.Add(threads) + remaining := make(chan int, N) + go func() { + for i := 0; i < N; i++ { + remaining <- i + } + close(remaining) + }() + for i := 0; i < threads; i++ { + go func() { + defer wg.Done() + for { + _, ok := <-remaining + if !ok { + return + } + for { + _, err := c.Pods("default").Create(basePod) + if err == nil { + break + } + } + } + }() + } + wg.Wait() +}