mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #82338 from draveness/feature/use-apiserver-in-scheduler-benchmarks
feat(scheduler): use api server to watch scheduled pods
This commit is contained in:
commit
57d87502ba
@ -45,6 +45,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
|
@ -18,6 +18,7 @@ package benchmark
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -26,6 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/csi-translation-lib/plugins"
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
@ -388,26 +390,30 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
scheduled := int32(0)
|
||||
completedCh := make(chan struct{})
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
curPod := cur.(*v1.Pod)
|
||||
oldPod := old.(*v1.Pod)
|
||||
|
||||
if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 {
|
||||
if atomic.AddInt32(&scheduled, 1) >= int32(b.N) {
|
||||
completedCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
// start benchmark
|
||||
b.ResetTimer()
|
||||
config = testutils.NewTestPodCreatorConfig()
|
||||
config.AddStrategy("sched-test", b.N, testPodStrategy)
|
||||
podCreator = testutils.NewTestPodCreator(clientset, config)
|
||||
podCreator.CreatePods()
|
||||
for {
|
||||
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
||||
scheduled, err := getScheduledPods(podInformer)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
if len(scheduled) >= numExistingPods+b.N {
|
||||
break
|
||||
}
|
||||
|
||||
// Note: This might introduce slight deviation in accuracy of benchmark results.
|
||||
// Since the total amount of time is relatively large, it might not be a concern.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
<-completedCh
|
||||
|
||||
// Note: without this line we're taking the overhead of defer() into account.
|
||||
b.StopTimer()
|
||||
|
@ -17,9 +17,11 @@ limitations under the License.
|
||||
package benchmark
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -28,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
|
||||
@ -131,7 +134,7 @@ func getBaseConfig(nodes int, pods int) *testConfig {
|
||||
// It returns the minimum of throughput over whole run.
|
||||
func schedulePods(config *testConfig) int32 {
|
||||
defer config.destroyFunc()
|
||||
prev := 0
|
||||
prev := int32(0)
|
||||
// On startup there may be a latent period where NO scheduling occurs (qps = 0).
|
||||
// We are interested in low scheduling rates (i.e. qps=2),
|
||||
minQPS := int32(math.MaxInt32)
|
||||
@ -151,39 +154,59 @@ func schedulePods(config *testConfig) int32 {
|
||||
break
|
||||
}
|
||||
}
|
||||
// map minimum QPS entries in a counter, useful for debugging tests.
|
||||
qpsStats := map[int]int{}
|
||||
|
||||
// Now that scheduling has started, lets start taking the pulse on how many pods are happening per second.
|
||||
for {
|
||||
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
||||
scheduled, err := getScheduledPods(podInformer)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
// We will be completed when all pods are done being scheduled.
|
||||
// return the worst-case-scenario interval that was seen during this time.
|
||||
// Note this should never be low due to cold-start, so allow bake in sched time if necessary.
|
||||
if len(scheduled) >= config.numPods {
|
||||
consumed := int(time.Since(start) / time.Second)
|
||||
if consumed <= 0 {
|
||||
consumed = 1
|
||||
scheduled := int32(0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
config.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
curPod := cur.(*v1.Pod)
|
||||
oldPod := old.(*v1.Pod)
|
||||
|
||||
if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 {
|
||||
if atomic.AddInt32(&scheduled, 1) >= int32(config.numPods) {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
fmt.Printf("Scheduled %v Pods in %v seconds (%v per second on average). min QPS was %v\n",
|
||||
config.numPods, consumed, config.numPods/consumed, minQPS)
|
||||
return minQPS
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
// There's no point in printing it for the last iteration, as the value is random
|
||||
qps := len(scheduled) - prev
|
||||
qpsStats[qps]++
|
||||
if int32(qps) < minQPS {
|
||||
minQPS = int32(qps)
|
||||
// map minimum QPS entries in a counter, useful for debugging tests.
|
||||
qpsStats := map[int32]int{}
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
scheduled := atomic.LoadInt32(&scheduled)
|
||||
qps := scheduled - prev
|
||||
qpsStats[qps]++
|
||||
if qps < minQPS {
|
||||
minQPS = qps
|
||||
}
|
||||
fmt.Printf("%ds\trate: %d\ttotal: %d (qps frequency: %v)\n", time.Since(start)/time.Second, qps, scheduled, qpsStats)
|
||||
prev = scheduled
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
fmt.Printf("%ds\trate: %d\ttotal: %d (qps frequency: %v)\n", time.Since(start)/time.Second, qps, len(scheduled), qpsStats)
|
||||
prev = len(scheduled)
|
||||
time.Sleep(1 * time.Second)
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
ticker.Stop()
|
||||
|
||||
// We will be completed when all pods are done being scheduled.
|
||||
// return the worst-case-scenario interval that was seen during this time.
|
||||
// Note this should never be low due to cold-start, so allow bake in sched time if necessary.
|
||||
consumed := int(time.Since(start) / time.Second)
|
||||
if consumed <= 0 {
|
||||
consumed = 1
|
||||
}
|
||||
fmt.Printf("Scheduled %v Pods in %v seconds (%v per second on average). min QPS was %v\n",
|
||||
config.numPods, consumed, config.numPods/consumed, minQPS)
|
||||
return minQPS
|
||||
}
|
||||
|
||||
// mutateNodeTemplate returns the modified node needed for creation of nodes.
|
||||
|
Loading…
Reference in New Issue
Block a user