Merge pull request #98900 from Huang-Wei/churn-cluster-op

Introduce a churnOp to scheduler perf testing framework
This commit is contained in:
Kubernetes Prow Robot 2021-03-11 02:00:24 -08:00 committed by GitHub
commit 823fa75643
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 276 additions and 13 deletions

View File

@ -0,0 +1,11 @@
apiVersion: v1
kind: Node
metadata:
generateName: node-churn-
status:
capacity:
pods: "0"
conditions:
- status: "True"
type: Ready
phase: Running

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Pod
metadata:
generateName: pod-churn-
spec:
containers:
- image: k8s.gcr.io/pause:3.4.1
name: pause

View File

@ -0,0 +1,11 @@
apiVersion: v1
kind: Service
metadata:
generateName: service-churn-
spec:
selector:
app: foo
ports:
- protocol: TCP
port: 8080
targetPort: 8080

View File

@ -449,3 +449,29 @@
initNodes: 5000 initNodes: 5000
initPods: 2000 initPods: 2000
measurePods: 5000 measurePods: 5000
- name: SchedulingWithMixedChurn
workloadTemplate:
- opcode: createNodes
countParam: $initNodes
- opcode: churn
mode: recreate
number: 1
templatePaths:
- config/churn/node-default.yaml
- config/pod-high-priority-large-cpu.yaml
- config/churn/service-default.yaml
intervalMilliseconds: 1000
- opcode: createPods
countParam: $measurePods
podTemplatePath: config/pod-default.yaml
collectMetrics: true
workloads:
- name: 1000Nodes
params:
initNodes: 1000
measurePods: 1000
- name: 5000Nodes
params:
initNodes: 5000
measurePods: 2000

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
generateName: pod-h-
spec:
priority: 10
containers:
- image: k8s.gcr.io/pause:3.2
name: pause
ports:
- containerPort: 80
resources:
requests:
cpu: 9
memory: 500Mi

View File

@ -442,7 +442,7 @@ func benchmarkScheduling(numExistingPods, minPods int,
//lint:ignore SA3001 Set a minimum for b.N to get more meaningful results //lint:ignore SA3001 Set a minimum for b.N to get more meaningful results
b.N = minPods b.N = minPods
} }
finalFunc, podInformer, clientset := mustSetupScheduler() finalFunc, podInformer, clientset, _ := mustSetupScheduler()
defer finalFunc() defer finalFunc()
nodePreparer := framework.NewIntegrationTestNodePreparer( nodePreparer := framework.NewIntegrationTestNodePreparer(

View File

@ -21,16 +21,25 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"strings" "strings"
"sync" "sync"
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -43,7 +52,15 @@ const (
configFile = "config/performance-config.yaml" configFile = "config/performance-config.yaml"
createNodesOpcode = "createNodes" createNodesOpcode = "createNodes"
createPodsOpcode = "createPods" createPodsOpcode = "createPods"
churnOpcode = "churn"
barrierOpcode = "barrier" barrierOpcode = "barrier"
// Two modes supported in "churn" operator.
// Recreate creates a number of API objects and then delete them, and repeat the iteration.
Recreate = "recreate"
// Create continuously create API objects without deleting them.
Create = "create"
) )
var ( var (
@ -90,7 +107,7 @@ func (tc *testCase) collectsMetrics() bool {
// workload is a subtest under a testCase that tests the scheduler performance // workload is a subtest under a testCase that tests the scheduler performance
// for a certain ordering of ops. The set of nodes created and pods scheduled // for a certain ordering of ops. The set of nodes created and pods scheduled
// in a workload may be heterogenous. // in a workload may be heterogeneous.
type workload struct { type workload struct {
// Name of the workload. // Name of the workload.
Name string Name string
@ -109,6 +126,7 @@ func (op *op) UnmarshalJSON(b []byte) error {
possibleOps := []realOp{ possibleOps := []realOp{
&createNodesOp{}, &createNodesOp{},
&createPodsOp{}, &createPodsOp{},
&churnOp{},
&barrierOp{}, &barrierOp{},
// TODO(#93793): add a sleep timer op to simulate waiting? // TODO(#93793): add a sleep timer op to simulate waiting?
// TODO(#94601): add a delete nodes op to simulate scaling behaviour? // TODO(#94601): add a delete nodes op to simulate scaling behaviour?
@ -252,6 +270,55 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
return &cpo, (&cpo).isValid(false) return &cpo, (&cpo).isValid(false)
} }
// churnOp defines an op where services are created as a part of a workload.
type churnOp struct {
// Must be "churnOp".
Opcode string
// Value must be one of the followings:
// - recreate. In this mode, API objects will be created for N cycles, and then
// deleted in the next N cycles. N is specified by the "Number" field.
// - create. In this mode, API objects will be created (without deletion) until
// reaching a threshold - which is specified by the "Number" field.
Mode string
// Maximum number of API objects to be created.
// Defaults to 0, which means unlimited.
Number int
// Intervals of churning. Defaults to 500 millisecond.
IntervalMilliseconds int64
// Namespace the churning objects should be created in. Optional, defaults to a unique
// namespace of the format "namespace-<number>".
Namespace *string
// Path of API spec files.
TemplatePaths []string
}
func (co *churnOp) isValid(_ bool) error {
if co.Opcode != churnOpcode {
return fmt.Errorf("invalid opcode")
}
if co.Mode != Recreate && co.Mode != Create {
return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
}
if co.Number < 0 {
return fmt.Errorf("number (%v) cannot be negative", co.Number)
}
if co.Mode == Recreate && co.Number == 0 {
return fmt.Errorf("number cannot be 0 when mode is %v", Recreate)
}
if len(co.TemplatePaths) == 0 {
return fmt.Errorf("at least one template spec file needs to be specified")
}
return nil
}
func (*churnOp) collectsMetrics() bool {
return false
}
func (co churnOp) patchParams(w *workload) (realOp, error) {
return &co, nil
}
// barrierOp defines an op that can be used to wait until all scheduled pods of // barrierOp defines an op that can be used to wait until all scheduled pods of
// one or many namespaces have been bound to nodes. This is useful when pods // one or many namespaces have been bound to nodes. This is useful when pods
// were scheduled with SkipWaitToCompletion set to true. // were scheduled with SkipWaitToCompletion set to true.
@ -309,7 +376,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
// 30 minutes should be plenty enough even for the 5000-node tests. // 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel() defer cancel()
finalFunc, podInformer, clientset := mustSetupScheduler() finalFunc, podInformer, client, dynClient := mustSetupScheduler()
b.Cleanup(finalFunc) b.Cleanup(finalFunc)
var mu sync.Mutex var mu sync.Mutex
@ -329,7 +396,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
} }
switch concreteOp := realOp.(type) { switch concreteOp := realOp.(type) {
case *createNodesOp: case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, clientset) nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
if err != nil { if err != nil {
b.Fatalf("op %d: %v", opIndex, err) b.Fatalf("op %d: %v", opIndex, err)
} }
@ -359,7 +426,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
go collector.run(collectorCtx) go collector.run(collectorCtx)
} }
} }
if err := createPods(namespace, concreteOp, clientset); err != nil { if err := createPods(namespace, concreteOp, client); err != nil {
b.Fatalf("op %d: %v", opIndex, err) b.Fatalf("op %d: %v", opIndex, err)
} }
if concreteOp.SkipWaitToCompletion { if concreteOp.SkipWaitToCompletion {
@ -387,6 +454,103 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
mu.Unlock() mu.Unlock()
} }
case *churnOp:
var namespace string
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
} else {
namespace = fmt.Sprintf("namespace-%d", opIndex)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery()))
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = dynClient.Resource(gvr).Namespace(namespace)
} else {
dynRes = dynClient.Resource(gvr)
}
churnFns = append(churnFns, func(name string) string {
if name != "" {
dynRes.Delete(ctx, name, metav1.DeleteOptions{})
return ""
}
live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{})
if err != nil {
return ""
}
return live.GetName()
})
}
var interval int64 = 500
if concreteOp.IntervalMilliseconds != 0 {
interval = concreteOp.IntervalMilliseconds
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
if concreteOp.Mode == Recreate {
go func() {
retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals {
retVals[i] = make([]string, concreteOp.Number)
}
count := 0
for {
select {
case <-ticker.C:
for i := range churnFns {
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
}
count++
case <-ctx.Done():
return
}
}
}()
} else if concreteOp.Mode == Create {
go func() {
count, threshold := 0, concreteOp.Number
if threshold == 0 {
threshold = math.MaxInt32
}
for count < threshold {
select {
case <-ticker.C:
for i := range churnFns {
churnFns[i]("")
}
count++
case <-ctx.Done():
return
}
}
}()
}
case *barrierOp: case *barrierOp:
for _, namespace := range concreteOp.Namespaces { for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
@ -525,6 +689,28 @@ func getSpecFromFile(path *string, spec interface{}) error {
return yaml.UnmarshalStrict(bytes, spec) return yaml.UnmarshalStrict(bytes, spec)
} }
func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) {
bytes, err := ioutil.ReadFile(path)
if err != nil {
return nil, nil, err
}
bytes, err = yaml.YAMLToJSONStrict(bytes)
if err != nil {
return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err)
}
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
if err != nil {
return nil, nil, err
}
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path)
}
return unstructuredObj, gvk, nil
}
func getTestCases(path string) ([]*testCase, error) { func getTestCases(path string) ([]*testCase, error) {
testCases := make([]*testCase, 0) testCases := make([]*testCase, 0)
if err := getSpecFromFile(&path, &testCases); err != nil { if err := getSpecFromFile(&path, &testCases); err != nil {

View File

@ -116,7 +116,7 @@ type testConfig struct {
// getBaseConfig returns baseConfig after initializing number of nodes and pods. // getBaseConfig returns baseConfig after initializing number of nodes and pods.
func getBaseConfig(nodes int, pods int) *testConfig { func getBaseConfig(nodes int, pods int) *testConfig {
destroyFunc, podInformer, clientset := mustSetupScheduler() destroyFunc, podInformer, clientset, _ := mustSetupScheduler()
return &testConfig{ return &testConfig{
clientset: clientset, clientset: clientset,
destroyFunc: destroyFunc, destroyFunc: destroyFunc,

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -55,20 +56,25 @@ var dataItemsDir = flag.String("data-items-dir", "", "destination directory for
// mustSetupScheduler starts the following components: // mustSetupScheduler starts the following components:
// - k8s api server (a.k.a. master) // - k8s api server (a.k.a. master)
// - scheduler // - scheduler
// It returns clientset and destroyFunc which should be used to // It returns regular and dynamic clients, and destroyFunc which should be used to
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface) { func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) {
apiURL, apiShutdown := util.StartApiserver() apiURL, apiShutdown := util.StartApiserver()
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
cfg := &restclient.Config{
Host: apiURL, Host: apiURL,
ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
QPS: 5000.0, QPS: 5000.0,
Burst: 5000, Burst: 5000,
}) }
_, podInformer, schedulerShutdown := util.StartScheduler(clientSet)
fakePVControllerShutdown := util.StartFakePVController(clientSet) client := clientset.NewForConfigOrDie(cfg)
dynClient := dynamic.NewForConfigOrDie(cfg)
_, podInformer, schedulerShutdown := util.StartScheduler(client)
fakePVControllerShutdown := util.StartFakePVController(client)
shutdownFunc := func() { shutdownFunc := func() {
fakePVControllerShutdown() fakePVControllerShutdown()
@ -76,7 +82,7 @@ func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clients
apiShutdown() apiShutdown()
} }
return shutdownFunc, podInformer, clientSet return shutdownFunc, podInformer, client, dynClient
} }
// Returns the list of scheduled pods in the specified namespaces. // Returns the list of scheduled pods in the specified namespaces.